##// END OF EJS Templates
wireproto: do not abort after successful lookup...
Kyle Lippincott -
r34064:6c6169f7 default
parent child Browse files
Show More
@@ -1,1059 +1,1060
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 encoding,
24 encoding,
25 error,
25 error,
26 exchange,
26 exchange,
27 peer,
27 peer,
28 pushkey as pushkeymod,
28 pushkey as pushkeymod,
29 pycompat,
29 pycompat,
30 repository,
30 repository,
31 streamclone,
31 streamclone,
32 util,
32 util,
33 )
33 )
34
34
35 urlerr = util.urlerr
35 urlerr = util.urlerr
36 urlreq = util.urlreq
36 urlreq = util.urlreq
37
37
38 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
38 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
39 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 'IncompatibleClient')
40 'IncompatibleClient')
41 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
41 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42
42
43 class abstractserverproto(object):
43 class abstractserverproto(object):
44 """abstract class that summarizes the protocol API
44 """abstract class that summarizes the protocol API
45
45
46 Used as reference and documentation.
46 Used as reference and documentation.
47 """
47 """
48
48
49 def getargs(self, args):
49 def getargs(self, args):
50 """return the value for arguments in <args>
50 """return the value for arguments in <args>
51
51
52 returns a list of values (same order as <args>)"""
52 returns a list of values (same order as <args>)"""
53 raise NotImplementedError()
53 raise NotImplementedError()
54
54
55 def getfile(self, fp):
55 def getfile(self, fp):
56 """write the whole content of a file into a file like object
56 """write the whole content of a file into a file like object
57
57
58 The file is in the form::
58 The file is in the form::
59
59
60 (<chunk-size>\n<chunk>)+0\n
60 (<chunk-size>\n<chunk>)+0\n
61
61
62 chunk size is the ascii version of the int.
62 chunk size is the ascii version of the int.
63 """
63 """
64 raise NotImplementedError()
64 raise NotImplementedError()
65
65
66 def redirect(self):
66 def redirect(self):
67 """may setup interception for stdout and stderr
67 """may setup interception for stdout and stderr
68
68
69 See also the `restore` method."""
69 See also the `restore` method."""
70 raise NotImplementedError()
70 raise NotImplementedError()
71
71
72 # If the `redirect` function does install interception, the `restore`
72 # If the `redirect` function does install interception, the `restore`
73 # function MUST be defined. If interception is not used, this function
73 # function MUST be defined. If interception is not used, this function
74 # MUST NOT be defined.
74 # MUST NOT be defined.
75 #
75 #
76 # left commented here on purpose
76 # left commented here on purpose
77 #
77 #
78 #def restore(self):
78 #def restore(self):
79 # """reinstall previous stdout and stderr and return intercepted stdout
79 # """reinstall previous stdout and stderr and return intercepted stdout
80 # """
80 # """
81 # raise NotImplementedError()
81 # raise NotImplementedError()
82
82
83 class remoteiterbatcher(peer.iterbatcher):
83 class remoteiterbatcher(peer.iterbatcher):
84 def __init__(self, remote):
84 def __init__(self, remote):
85 super(remoteiterbatcher, self).__init__()
85 super(remoteiterbatcher, self).__init__()
86 self._remote = remote
86 self._remote = remote
87
87
88 def __getattr__(self, name):
88 def __getattr__(self, name):
89 # Validate this method is batchable, since submit() only supports
89 # Validate this method is batchable, since submit() only supports
90 # batchable methods.
90 # batchable methods.
91 fn = getattr(self._remote, name)
91 fn = getattr(self._remote, name)
92 if not getattr(fn, 'batchable', None):
92 if not getattr(fn, 'batchable', None):
93 raise error.ProgrammingError('Attempted to batch a non-batchable '
93 raise error.ProgrammingError('Attempted to batch a non-batchable '
94 'call to %r' % name)
94 'call to %r' % name)
95
95
96 return super(remoteiterbatcher, self).__getattr__(name)
96 return super(remoteiterbatcher, self).__getattr__(name)
97
97
98 def submit(self):
98 def submit(self):
99 """Break the batch request into many patch calls and pipeline them.
99 """Break the batch request into many patch calls and pipeline them.
100
100
101 This is mostly valuable over http where request sizes can be
101 This is mostly valuable over http where request sizes can be
102 limited, but can be used in other places as well.
102 limited, but can be used in other places as well.
103 """
103 """
104 # 2-tuple of (command, arguments) that represents what will be
104 # 2-tuple of (command, arguments) that represents what will be
105 # sent over the wire.
105 # sent over the wire.
106 requests = []
106 requests = []
107
107
108 # 4-tuple of (command, final future, @batchable generator, remote
108 # 4-tuple of (command, final future, @batchable generator, remote
109 # future).
109 # future).
110 results = []
110 results = []
111
111
112 for command, args, opts, finalfuture in self.calls:
112 for command, args, opts, finalfuture in self.calls:
113 mtd = getattr(self._remote, command)
113 mtd = getattr(self._remote, command)
114 batchable = mtd.batchable(mtd.im_self, *args, **opts)
114 batchable = mtd.batchable(mtd.im_self, *args, **opts)
115
115
116 commandargs, fremote = next(batchable)
116 commandargs, fremote = next(batchable)
117 assert fremote
117 assert fremote
118 requests.append((command, commandargs))
118 requests.append((command, commandargs))
119 results.append((command, finalfuture, batchable, fremote))
119 results.append((command, finalfuture, batchable, fremote))
120
120
121 if requests:
121 if requests:
122 self._resultiter = self._remote._submitbatch(requests)
122 self._resultiter = self._remote._submitbatch(requests)
123
123
124 self._results = results
124 self._results = results
125
125
126 def results(self):
126 def results(self):
127 for command, finalfuture, batchable, remotefuture in self._results:
127 for command, finalfuture, batchable, remotefuture in self._results:
128 # Get the raw result, set it in the remote future, feed it
128 # Get the raw result, set it in the remote future, feed it
129 # back into the @batchable generator so it can be decoded, and
129 # back into the @batchable generator so it can be decoded, and
130 # set the result on the final future to this value.
130 # set the result on the final future to this value.
131 remoteresult = next(self._resultiter)
131 remoteresult = next(self._resultiter)
132 remotefuture.set(remoteresult)
132 remotefuture.set(remoteresult)
133 finalfuture.set(next(batchable))
133 finalfuture.set(next(batchable))
134
134
135 # Verify our @batchable generators only emit 2 values.
135 # Verify our @batchable generators only emit 2 values.
136 try:
136 try:
137 next(batchable)
137 next(batchable)
138 except StopIteration:
138 except StopIteration:
139 pass
139 pass
140 else:
140 else:
141 raise error.ProgrammingError('%s @batchable generator emitted '
141 raise error.ProgrammingError('%s @batchable generator emitted '
142 'unexpected value count' % command)
142 'unexpected value count' % command)
143
143
144 yield finalfuture.value
144 yield finalfuture.value
145
145
146 # Forward a couple of names from peer to make wireproto interactions
146 # Forward a couple of names from peer to make wireproto interactions
147 # slightly more sensible.
147 # slightly more sensible.
148 batchable = peer.batchable
148 batchable = peer.batchable
149 future = peer.future
149 future = peer.future
150
150
151 # list of nodes encoding / decoding
151 # list of nodes encoding / decoding
152
152
153 def decodelist(l, sep=' '):
153 def decodelist(l, sep=' '):
154 if l:
154 if l:
155 return map(bin, l.split(sep))
155 return map(bin, l.split(sep))
156 return []
156 return []
157
157
158 def encodelist(l, sep=' '):
158 def encodelist(l, sep=' '):
159 try:
159 try:
160 return sep.join(map(hex, l))
160 return sep.join(map(hex, l))
161 except TypeError:
161 except TypeError:
162 raise
162 raise
163
163
164 # batched call argument encoding
164 # batched call argument encoding
165
165
166 def escapearg(plain):
166 def escapearg(plain):
167 return (plain
167 return (plain
168 .replace(':', ':c')
168 .replace(':', ':c')
169 .replace(',', ':o')
169 .replace(',', ':o')
170 .replace(';', ':s')
170 .replace(';', ':s')
171 .replace('=', ':e'))
171 .replace('=', ':e'))
172
172
173 def unescapearg(escaped):
173 def unescapearg(escaped):
174 return (escaped
174 return (escaped
175 .replace(':e', '=')
175 .replace(':e', '=')
176 .replace(':s', ';')
176 .replace(':s', ';')
177 .replace(':o', ',')
177 .replace(':o', ',')
178 .replace(':c', ':'))
178 .replace(':c', ':'))
179
179
180 def encodebatchcmds(req):
180 def encodebatchcmds(req):
181 """Return a ``cmds`` argument value for the ``batch`` command."""
181 """Return a ``cmds`` argument value for the ``batch`` command."""
182 cmds = []
182 cmds = []
183 for op, argsdict in req:
183 for op, argsdict in req:
184 # Old servers didn't properly unescape argument names. So prevent
184 # Old servers didn't properly unescape argument names. So prevent
185 # the sending of argument names that may not be decoded properly by
185 # the sending of argument names that may not be decoded properly by
186 # servers.
186 # servers.
187 assert all(escapearg(k) == k for k in argsdict)
187 assert all(escapearg(k) == k for k in argsdict)
188
188
189 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
189 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
190 for k, v in argsdict.iteritems())
190 for k, v in argsdict.iteritems())
191 cmds.append('%s %s' % (op, args))
191 cmds.append('%s %s' % (op, args))
192
192
193 return ';'.join(cmds)
193 return ';'.join(cmds)
194
194
195 # mapping of options accepted by getbundle and their types
195 # mapping of options accepted by getbundle and their types
196 #
196 #
197 # Meant to be extended by extensions. It is extensions responsibility to ensure
197 # Meant to be extended by extensions. It is extensions responsibility to ensure
198 # such options are properly processed in exchange.getbundle.
198 # such options are properly processed in exchange.getbundle.
199 #
199 #
200 # supported types are:
200 # supported types are:
201 #
201 #
202 # :nodes: list of binary nodes
202 # :nodes: list of binary nodes
203 # :csv: list of comma-separated values
203 # :csv: list of comma-separated values
204 # :scsv: list of comma-separated values return as set
204 # :scsv: list of comma-separated values return as set
205 # :plain: string with no transformation needed.
205 # :plain: string with no transformation needed.
206 gboptsmap = {'heads': 'nodes',
206 gboptsmap = {'heads': 'nodes',
207 'common': 'nodes',
207 'common': 'nodes',
208 'obsmarkers': 'boolean',
208 'obsmarkers': 'boolean',
209 'bundlecaps': 'scsv',
209 'bundlecaps': 'scsv',
210 'listkeys': 'csv',
210 'listkeys': 'csv',
211 'cg': 'boolean',
211 'cg': 'boolean',
212 'cbattempted': 'boolean'}
212 'cbattempted': 'boolean'}
213
213
214 # client side
214 # client side
215
215
216 class wirepeer(repository.legacypeer):
216 class wirepeer(repository.legacypeer):
217 """Client-side interface for communicating with a peer repository.
217 """Client-side interface for communicating with a peer repository.
218
218
219 Methods commonly call wire protocol commands of the same name.
219 Methods commonly call wire protocol commands of the same name.
220
220
221 See also httppeer.py and sshpeer.py for protocol-specific
221 See also httppeer.py and sshpeer.py for protocol-specific
222 implementations of this interface.
222 implementations of this interface.
223 """
223 """
224 # Begin of basewirepeer interface.
224 # Begin of basewirepeer interface.
225
225
226 def iterbatch(self):
226 def iterbatch(self):
227 return remoteiterbatcher(self)
227 return remoteiterbatcher(self)
228
228
229 @batchable
229 @batchable
230 def lookup(self, key):
230 def lookup(self, key):
231 self.requirecap('lookup', _('look up remote revision'))
231 self.requirecap('lookup', _('look up remote revision'))
232 f = future()
232 f = future()
233 yield {'key': encoding.fromlocal(key)}, f
233 yield {'key': encoding.fromlocal(key)}, f
234 d = f.value
234 d = f.value
235 success, data = d[:-1].split(" ", 1)
235 success, data = d[:-1].split(" ", 1)
236 if int(success):
236 if int(success):
237 yield bin(data)
237 yield bin(data)
238 self._abort(error.RepoError(data))
238 else:
239 self._abort(error.RepoError(data))
239
240
240 @batchable
241 @batchable
241 def heads(self):
242 def heads(self):
242 f = future()
243 f = future()
243 yield {}, f
244 yield {}, f
244 d = f.value
245 d = f.value
245 try:
246 try:
246 yield decodelist(d[:-1])
247 yield decodelist(d[:-1])
247 except ValueError:
248 except ValueError:
248 self._abort(error.ResponseError(_("unexpected response:"), d))
249 self._abort(error.ResponseError(_("unexpected response:"), d))
249
250
250 @batchable
251 @batchable
251 def known(self, nodes):
252 def known(self, nodes):
252 f = future()
253 f = future()
253 yield {'nodes': encodelist(nodes)}, f
254 yield {'nodes': encodelist(nodes)}, f
254 d = f.value
255 d = f.value
255 try:
256 try:
256 yield [bool(int(b)) for b in d]
257 yield [bool(int(b)) for b in d]
257 except ValueError:
258 except ValueError:
258 self._abort(error.ResponseError(_("unexpected response:"), d))
259 self._abort(error.ResponseError(_("unexpected response:"), d))
259
260
260 @batchable
261 @batchable
261 def branchmap(self):
262 def branchmap(self):
262 f = future()
263 f = future()
263 yield {}, f
264 yield {}, f
264 d = f.value
265 d = f.value
265 try:
266 try:
266 branchmap = {}
267 branchmap = {}
267 for branchpart in d.splitlines():
268 for branchpart in d.splitlines():
268 branchname, branchheads = branchpart.split(' ', 1)
269 branchname, branchheads = branchpart.split(' ', 1)
269 branchname = encoding.tolocal(urlreq.unquote(branchname))
270 branchname = encoding.tolocal(urlreq.unquote(branchname))
270 branchheads = decodelist(branchheads)
271 branchheads = decodelist(branchheads)
271 branchmap[branchname] = branchheads
272 branchmap[branchname] = branchheads
272 yield branchmap
273 yield branchmap
273 except TypeError:
274 except TypeError:
274 self._abort(error.ResponseError(_("unexpected response:"), d))
275 self._abort(error.ResponseError(_("unexpected response:"), d))
275
276
276 @batchable
277 @batchable
277 def listkeys(self, namespace):
278 def listkeys(self, namespace):
278 if not self.capable('pushkey'):
279 if not self.capable('pushkey'):
279 yield {}, None
280 yield {}, None
280 f = future()
281 f = future()
281 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
282 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
282 yield {'namespace': encoding.fromlocal(namespace)}, f
283 yield {'namespace': encoding.fromlocal(namespace)}, f
283 d = f.value
284 d = f.value
284 self.ui.debug('received listkey for "%s": %i bytes\n'
285 self.ui.debug('received listkey for "%s": %i bytes\n'
285 % (namespace, len(d)))
286 % (namespace, len(d)))
286 yield pushkeymod.decodekeys(d)
287 yield pushkeymod.decodekeys(d)
287
288
288 @batchable
289 @batchable
289 def pushkey(self, namespace, key, old, new):
290 def pushkey(self, namespace, key, old, new):
290 if not self.capable('pushkey'):
291 if not self.capable('pushkey'):
291 yield False, None
292 yield False, None
292 f = future()
293 f = future()
293 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
294 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
294 yield {'namespace': encoding.fromlocal(namespace),
295 yield {'namespace': encoding.fromlocal(namespace),
295 'key': encoding.fromlocal(key),
296 'key': encoding.fromlocal(key),
296 'old': encoding.fromlocal(old),
297 'old': encoding.fromlocal(old),
297 'new': encoding.fromlocal(new)}, f
298 'new': encoding.fromlocal(new)}, f
298 d = f.value
299 d = f.value
299 d, output = d.split('\n', 1)
300 d, output = d.split('\n', 1)
300 try:
301 try:
301 d = bool(int(d))
302 d = bool(int(d))
302 except ValueError:
303 except ValueError:
303 raise error.ResponseError(
304 raise error.ResponseError(
304 _('push failed (unexpected response):'), d)
305 _('push failed (unexpected response):'), d)
305 for l in output.splitlines(True):
306 for l in output.splitlines(True):
306 self.ui.status(_('remote: '), l)
307 self.ui.status(_('remote: '), l)
307 yield d
308 yield d
308
309
309 def stream_out(self):
310 def stream_out(self):
310 return self._callstream('stream_out')
311 return self._callstream('stream_out')
311
312
312 def getbundle(self, source, **kwargs):
313 def getbundle(self, source, **kwargs):
313 self.requirecap('getbundle', _('look up remote changes'))
314 self.requirecap('getbundle', _('look up remote changes'))
314 opts = {}
315 opts = {}
315 bundlecaps = kwargs.get('bundlecaps')
316 bundlecaps = kwargs.get('bundlecaps')
316 if bundlecaps is not None:
317 if bundlecaps is not None:
317 kwargs['bundlecaps'] = sorted(bundlecaps)
318 kwargs['bundlecaps'] = sorted(bundlecaps)
318 else:
319 else:
319 bundlecaps = () # kwargs could have it to None
320 bundlecaps = () # kwargs could have it to None
320 for key, value in kwargs.iteritems():
321 for key, value in kwargs.iteritems():
321 if value is None:
322 if value is None:
322 continue
323 continue
323 keytype = gboptsmap.get(key)
324 keytype = gboptsmap.get(key)
324 if keytype is None:
325 if keytype is None:
325 assert False, 'unexpected'
326 assert False, 'unexpected'
326 elif keytype == 'nodes':
327 elif keytype == 'nodes':
327 value = encodelist(value)
328 value = encodelist(value)
328 elif keytype in ('csv', 'scsv'):
329 elif keytype in ('csv', 'scsv'):
329 value = ','.join(value)
330 value = ','.join(value)
330 elif keytype == 'boolean':
331 elif keytype == 'boolean':
331 value = '%i' % bool(value)
332 value = '%i' % bool(value)
332 elif keytype != 'plain':
333 elif keytype != 'plain':
333 raise KeyError('unknown getbundle option type %s'
334 raise KeyError('unknown getbundle option type %s'
334 % keytype)
335 % keytype)
335 opts[key] = value
336 opts[key] = value
336 f = self._callcompressable("getbundle", **opts)
337 f = self._callcompressable("getbundle", **opts)
337 if any((cap.startswith('HG2') for cap in bundlecaps)):
338 if any((cap.startswith('HG2') for cap in bundlecaps)):
338 return bundle2.getunbundler(self.ui, f)
339 return bundle2.getunbundler(self.ui, f)
339 else:
340 else:
340 return changegroupmod.cg1unpacker(f, 'UN')
341 return changegroupmod.cg1unpacker(f, 'UN')
341
342
342 def unbundle(self, cg, heads, url):
343 def unbundle(self, cg, heads, url):
343 '''Send cg (a readable file-like object representing the
344 '''Send cg (a readable file-like object representing the
344 changegroup to push, typically a chunkbuffer object) to the
345 changegroup to push, typically a chunkbuffer object) to the
345 remote server as a bundle.
346 remote server as a bundle.
346
347
347 When pushing a bundle10 stream, return an integer indicating the
348 When pushing a bundle10 stream, return an integer indicating the
348 result of the push (see changegroup.apply()).
349 result of the push (see changegroup.apply()).
349
350
350 When pushing a bundle20 stream, return a bundle20 stream.
351 When pushing a bundle20 stream, return a bundle20 stream.
351
352
352 `url` is the url the client thinks it's pushing to, which is
353 `url` is the url the client thinks it's pushing to, which is
353 visible to hooks.
354 visible to hooks.
354 '''
355 '''
355
356
356 if heads != ['force'] and self.capable('unbundlehash'):
357 if heads != ['force'] and self.capable('unbundlehash'):
357 heads = encodelist(['hashed',
358 heads = encodelist(['hashed',
358 hashlib.sha1(''.join(sorted(heads))).digest()])
359 hashlib.sha1(''.join(sorted(heads))).digest()])
359 else:
360 else:
360 heads = encodelist(heads)
361 heads = encodelist(heads)
361
362
362 if util.safehasattr(cg, 'deltaheader'):
363 if util.safehasattr(cg, 'deltaheader'):
363 # this a bundle10, do the old style call sequence
364 # this a bundle10, do the old style call sequence
364 ret, output = self._callpush("unbundle", cg, heads=heads)
365 ret, output = self._callpush("unbundle", cg, heads=heads)
365 if ret == "":
366 if ret == "":
366 raise error.ResponseError(
367 raise error.ResponseError(
367 _('push failed:'), output)
368 _('push failed:'), output)
368 try:
369 try:
369 ret = int(ret)
370 ret = int(ret)
370 except ValueError:
371 except ValueError:
371 raise error.ResponseError(
372 raise error.ResponseError(
372 _('push failed (unexpected response):'), ret)
373 _('push failed (unexpected response):'), ret)
373
374
374 for l in output.splitlines(True):
375 for l in output.splitlines(True):
375 self.ui.status(_('remote: '), l)
376 self.ui.status(_('remote: '), l)
376 else:
377 else:
377 # bundle2 push. Send a stream, fetch a stream.
378 # bundle2 push. Send a stream, fetch a stream.
378 stream = self._calltwowaystream('unbundle', cg, heads=heads)
379 stream = self._calltwowaystream('unbundle', cg, heads=heads)
379 ret = bundle2.getunbundler(self.ui, stream)
380 ret = bundle2.getunbundler(self.ui, stream)
380 return ret
381 return ret
381
382
382 # End of basewirepeer interface.
383 # End of basewirepeer interface.
383
384
384 # Begin of baselegacywirepeer interface.
385 # Begin of baselegacywirepeer interface.
385
386
386 def branches(self, nodes):
387 def branches(self, nodes):
387 n = encodelist(nodes)
388 n = encodelist(nodes)
388 d = self._call("branches", nodes=n)
389 d = self._call("branches", nodes=n)
389 try:
390 try:
390 br = [tuple(decodelist(b)) for b in d.splitlines()]
391 br = [tuple(decodelist(b)) for b in d.splitlines()]
391 return br
392 return br
392 except ValueError:
393 except ValueError:
393 self._abort(error.ResponseError(_("unexpected response:"), d))
394 self._abort(error.ResponseError(_("unexpected response:"), d))
394
395
395 def between(self, pairs):
396 def between(self, pairs):
396 batch = 8 # avoid giant requests
397 batch = 8 # avoid giant requests
397 r = []
398 r = []
398 for i in xrange(0, len(pairs), batch):
399 for i in xrange(0, len(pairs), batch):
399 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
400 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
400 d = self._call("between", pairs=n)
401 d = self._call("between", pairs=n)
401 try:
402 try:
402 r.extend(l and decodelist(l) or [] for l in d.splitlines())
403 r.extend(l and decodelist(l) or [] for l in d.splitlines())
403 except ValueError:
404 except ValueError:
404 self._abort(error.ResponseError(_("unexpected response:"), d))
405 self._abort(error.ResponseError(_("unexpected response:"), d))
405 return r
406 return r
406
407
407 def changegroup(self, nodes, kind):
408 def changegroup(self, nodes, kind):
408 n = encodelist(nodes)
409 n = encodelist(nodes)
409 f = self._callcompressable("changegroup", roots=n)
410 f = self._callcompressable("changegroup", roots=n)
410 return changegroupmod.cg1unpacker(f, 'UN')
411 return changegroupmod.cg1unpacker(f, 'UN')
411
412
412 def changegroupsubset(self, bases, heads, kind):
413 def changegroupsubset(self, bases, heads, kind):
413 self.requirecap('changegroupsubset', _('look up remote changes'))
414 self.requirecap('changegroupsubset', _('look up remote changes'))
414 bases = encodelist(bases)
415 bases = encodelist(bases)
415 heads = encodelist(heads)
416 heads = encodelist(heads)
416 f = self._callcompressable("changegroupsubset",
417 f = self._callcompressable("changegroupsubset",
417 bases=bases, heads=heads)
418 bases=bases, heads=heads)
418 return changegroupmod.cg1unpacker(f, 'UN')
419 return changegroupmod.cg1unpacker(f, 'UN')
419
420
420 # End of baselegacywirepeer interface.
421 # End of baselegacywirepeer interface.
421
422
422 def _submitbatch(self, req):
423 def _submitbatch(self, req):
423 """run batch request <req> on the server
424 """run batch request <req> on the server
424
425
425 Returns an iterator of the raw responses from the server.
426 Returns an iterator of the raw responses from the server.
426 """
427 """
427 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
428 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
428 chunk = rsp.read(1024)
429 chunk = rsp.read(1024)
429 work = [chunk]
430 work = [chunk]
430 while chunk:
431 while chunk:
431 while ';' not in chunk and chunk:
432 while ';' not in chunk and chunk:
432 chunk = rsp.read(1024)
433 chunk = rsp.read(1024)
433 work.append(chunk)
434 work.append(chunk)
434 merged = ''.join(work)
435 merged = ''.join(work)
435 while ';' in merged:
436 while ';' in merged:
436 one, merged = merged.split(';', 1)
437 one, merged = merged.split(';', 1)
437 yield unescapearg(one)
438 yield unescapearg(one)
438 chunk = rsp.read(1024)
439 chunk = rsp.read(1024)
439 work = [merged, chunk]
440 work = [merged, chunk]
440 yield unescapearg(''.join(work))
441 yield unescapearg(''.join(work))
441
442
442 def _submitone(self, op, args):
443 def _submitone(self, op, args):
443 return self._call(op, **args)
444 return self._call(op, **args)
444
445
445 def debugwireargs(self, one, two, three=None, four=None, five=None):
446 def debugwireargs(self, one, two, three=None, four=None, five=None):
446 # don't pass optional arguments left at their default value
447 # don't pass optional arguments left at their default value
447 opts = {}
448 opts = {}
448 if three is not None:
449 if three is not None:
449 opts['three'] = three
450 opts['three'] = three
450 if four is not None:
451 if four is not None:
451 opts['four'] = four
452 opts['four'] = four
452 return self._call('debugwireargs', one=one, two=two, **opts)
453 return self._call('debugwireargs', one=one, two=two, **opts)
453
454
454 def _call(self, cmd, **args):
455 def _call(self, cmd, **args):
455 """execute <cmd> on the server
456 """execute <cmd> on the server
456
457
457 The command is expected to return a simple string.
458 The command is expected to return a simple string.
458
459
459 returns the server reply as a string."""
460 returns the server reply as a string."""
460 raise NotImplementedError()
461 raise NotImplementedError()
461
462
462 def _callstream(self, cmd, **args):
463 def _callstream(self, cmd, **args):
463 """execute <cmd> on the server
464 """execute <cmd> on the server
464
465
465 The command is expected to return a stream. Note that if the
466 The command is expected to return a stream. Note that if the
466 command doesn't return a stream, _callstream behaves
467 command doesn't return a stream, _callstream behaves
467 differently for ssh and http peers.
468 differently for ssh and http peers.
468
469
469 returns the server reply as a file like object.
470 returns the server reply as a file like object.
470 """
471 """
471 raise NotImplementedError()
472 raise NotImplementedError()
472
473
473 def _callcompressable(self, cmd, **args):
474 def _callcompressable(self, cmd, **args):
474 """execute <cmd> on the server
475 """execute <cmd> on the server
475
476
476 The command is expected to return a stream.
477 The command is expected to return a stream.
477
478
478 The stream may have been compressed in some implementations. This
479 The stream may have been compressed in some implementations. This
479 function takes care of the decompression. This is the only difference
480 function takes care of the decompression. This is the only difference
480 with _callstream.
481 with _callstream.
481
482
482 returns the server reply as a file like object.
483 returns the server reply as a file like object.
483 """
484 """
484 raise NotImplementedError()
485 raise NotImplementedError()
485
486
486 def _callpush(self, cmd, fp, **args):
487 def _callpush(self, cmd, fp, **args):
487 """execute a <cmd> on server
488 """execute a <cmd> on server
488
489
489 The command is expected to be related to a push. Push has a special
490 The command is expected to be related to a push. Push has a special
490 return method.
491 return method.
491
492
492 returns the server reply as a (ret, output) tuple. ret is either
493 returns the server reply as a (ret, output) tuple. ret is either
493 empty (error) or a stringified int.
494 empty (error) or a stringified int.
494 """
495 """
495 raise NotImplementedError()
496 raise NotImplementedError()
496
497
497 def _calltwowaystream(self, cmd, fp, **args):
498 def _calltwowaystream(self, cmd, fp, **args):
498 """execute <cmd> on server
499 """execute <cmd> on server
499
500
500 The command will send a stream to the server and get a stream in reply.
501 The command will send a stream to the server and get a stream in reply.
501 """
502 """
502 raise NotImplementedError()
503 raise NotImplementedError()
503
504
504 def _abort(self, exception):
505 def _abort(self, exception):
505 """clearly abort the wire protocol connection and raise the exception
506 """clearly abort the wire protocol connection and raise the exception
506 """
507 """
507 raise NotImplementedError()
508 raise NotImplementedError()
508
509
509 # server side
510 # server side
510
511
511 # wire protocol command can either return a string or one of these classes.
512 # wire protocol command can either return a string or one of these classes.
512 class streamres(object):
513 class streamres(object):
513 """wireproto reply: binary stream
514 """wireproto reply: binary stream
514
515
515 The call was successful and the result is a stream.
516 The call was successful and the result is a stream.
516
517
517 Accepts either a generator or an object with a ``read(size)`` method.
518 Accepts either a generator or an object with a ``read(size)`` method.
518
519
519 ``v1compressible`` indicates whether this data can be compressed to
520 ``v1compressible`` indicates whether this data can be compressed to
520 "version 1" clients (technically: HTTP peers using
521 "version 1" clients (technically: HTTP peers using
521 application/mercurial-0.1 media type). This flag should NOT be used on
522 application/mercurial-0.1 media type). This flag should NOT be used on
522 new commands because new clients should support a more modern compression
523 new commands because new clients should support a more modern compression
523 mechanism.
524 mechanism.
524 """
525 """
525 def __init__(self, gen=None, reader=None, v1compressible=False):
526 def __init__(self, gen=None, reader=None, v1compressible=False):
526 self.gen = gen
527 self.gen = gen
527 self.reader = reader
528 self.reader = reader
528 self.v1compressible = v1compressible
529 self.v1compressible = v1compressible
529
530
530 class pushres(object):
531 class pushres(object):
531 """wireproto reply: success with simple integer return
532 """wireproto reply: success with simple integer return
532
533
533 The call was successful and returned an integer contained in `self.res`.
534 The call was successful and returned an integer contained in `self.res`.
534 """
535 """
535 def __init__(self, res):
536 def __init__(self, res):
536 self.res = res
537 self.res = res
537
538
538 class pusherr(object):
539 class pusherr(object):
539 """wireproto reply: failure
540 """wireproto reply: failure
540
541
541 The call failed. The `self.res` attribute contains the error message.
542 The call failed. The `self.res` attribute contains the error message.
542 """
543 """
543 def __init__(self, res):
544 def __init__(self, res):
544 self.res = res
545 self.res = res
545
546
546 class ooberror(object):
547 class ooberror(object):
547 """wireproto reply: failure of a batch of operation
548 """wireproto reply: failure of a batch of operation
548
549
549 Something failed during a batch call. The error message is stored in
550 Something failed during a batch call. The error message is stored in
550 `self.message`.
551 `self.message`.
551 """
552 """
552 def __init__(self, message):
553 def __init__(self, message):
553 self.message = message
554 self.message = message
554
555
555 def getdispatchrepo(repo, proto, command):
556 def getdispatchrepo(repo, proto, command):
556 """Obtain the repo used for processing wire protocol commands.
557 """Obtain the repo used for processing wire protocol commands.
557
558
558 The intent of this function is to serve as a monkeypatch point for
559 The intent of this function is to serve as a monkeypatch point for
559 extensions that need commands to operate on different repo views under
560 extensions that need commands to operate on different repo views under
560 specialized circumstances.
561 specialized circumstances.
561 """
562 """
562 return repo.filtered('served')
563 return repo.filtered('served')
563
564
564 def dispatch(repo, proto, command):
565 def dispatch(repo, proto, command):
565 repo = getdispatchrepo(repo, proto, command)
566 repo = getdispatchrepo(repo, proto, command)
566 func, spec = commands[command]
567 func, spec = commands[command]
567 args = proto.getargs(spec)
568 args = proto.getargs(spec)
568 return func(repo, proto, *args)
569 return func(repo, proto, *args)
569
570
570 def options(cmd, keys, others):
571 def options(cmd, keys, others):
571 opts = {}
572 opts = {}
572 for k in keys:
573 for k in keys:
573 if k in others:
574 if k in others:
574 opts[k] = others[k]
575 opts[k] = others[k]
575 del others[k]
576 del others[k]
576 if others:
577 if others:
577 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
578 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
578 % (cmd, ",".join(others)))
579 % (cmd, ",".join(others)))
579 return opts
580 return opts
580
581
581 def bundle1allowed(repo, action):
582 def bundle1allowed(repo, action):
582 """Whether a bundle1 operation is allowed from the server.
583 """Whether a bundle1 operation is allowed from the server.
583
584
584 Priority is:
585 Priority is:
585
586
586 1. server.bundle1gd.<action> (if generaldelta active)
587 1. server.bundle1gd.<action> (if generaldelta active)
587 2. server.bundle1.<action>
588 2. server.bundle1.<action>
588 3. server.bundle1gd (if generaldelta active)
589 3. server.bundle1gd (if generaldelta active)
589 4. server.bundle1
590 4. server.bundle1
590 """
591 """
591 ui = repo.ui
592 ui = repo.ui
592 gd = 'generaldelta' in repo.requirements
593 gd = 'generaldelta' in repo.requirements
593
594
594 if gd:
595 if gd:
595 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
596 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
596 if v is not None:
597 if v is not None:
597 return v
598 return v
598
599
599 v = ui.configbool('server', 'bundle1.%s' % action, None)
600 v = ui.configbool('server', 'bundle1.%s' % action, None)
600 if v is not None:
601 if v is not None:
601 return v
602 return v
602
603
603 if gd:
604 if gd:
604 v = ui.configbool('server', 'bundle1gd')
605 v = ui.configbool('server', 'bundle1gd')
605 if v is not None:
606 if v is not None:
606 return v
607 return v
607
608
608 return ui.configbool('server', 'bundle1')
609 return ui.configbool('server', 'bundle1')
609
610
610 def supportedcompengines(ui, proto, role):
611 def supportedcompengines(ui, proto, role):
611 """Obtain the list of supported compression engines for a request."""
612 """Obtain the list of supported compression engines for a request."""
612 assert role in (util.CLIENTROLE, util.SERVERROLE)
613 assert role in (util.CLIENTROLE, util.SERVERROLE)
613
614
614 compengines = util.compengines.supportedwireengines(role)
615 compengines = util.compengines.supportedwireengines(role)
615
616
616 # Allow config to override default list and ordering.
617 # Allow config to override default list and ordering.
617 if role == util.SERVERROLE:
618 if role == util.SERVERROLE:
618 configengines = ui.configlist('server', 'compressionengines')
619 configengines = ui.configlist('server', 'compressionengines')
619 config = 'server.compressionengines'
620 config = 'server.compressionengines'
620 else:
621 else:
621 # This is currently implemented mainly to facilitate testing. In most
622 # This is currently implemented mainly to facilitate testing. In most
622 # cases, the server should be in charge of choosing a compression engine
623 # cases, the server should be in charge of choosing a compression engine
623 # because a server has the most to lose from a sub-optimal choice. (e.g.
624 # because a server has the most to lose from a sub-optimal choice. (e.g.
624 # CPU DoS due to an expensive engine or a network DoS due to poor
625 # CPU DoS due to an expensive engine or a network DoS due to poor
625 # compression ratio).
626 # compression ratio).
626 configengines = ui.configlist('experimental',
627 configengines = ui.configlist('experimental',
627 'clientcompressionengines')
628 'clientcompressionengines')
628 config = 'experimental.clientcompressionengines'
629 config = 'experimental.clientcompressionengines'
629
630
630 # No explicit config. Filter out the ones that aren't supposed to be
631 # No explicit config. Filter out the ones that aren't supposed to be
631 # advertised and return default ordering.
632 # advertised and return default ordering.
632 if not configengines:
633 if not configengines:
633 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
634 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
634 return [e for e in compengines
635 return [e for e in compengines
635 if getattr(e.wireprotosupport(), attr) > 0]
636 if getattr(e.wireprotosupport(), attr) > 0]
636
637
637 # If compression engines are listed in the config, assume there is a good
638 # If compression engines are listed in the config, assume there is a good
638 # reason for it (like server operators wanting to achieve specific
639 # reason for it (like server operators wanting to achieve specific
639 # performance characteristics). So fail fast if the config references
640 # performance characteristics). So fail fast if the config references
640 # unusable compression engines.
641 # unusable compression engines.
641 validnames = set(e.name() for e in compengines)
642 validnames = set(e.name() for e in compengines)
642 invalidnames = set(e for e in configengines if e not in validnames)
643 invalidnames = set(e for e in configengines if e not in validnames)
643 if invalidnames:
644 if invalidnames:
644 raise error.Abort(_('invalid compression engine defined in %s: %s') %
645 raise error.Abort(_('invalid compression engine defined in %s: %s') %
645 (config, ', '.join(sorted(invalidnames))))
646 (config, ', '.join(sorted(invalidnames))))
646
647
647 compengines = [e for e in compengines if e.name() in configengines]
648 compengines = [e for e in compengines if e.name() in configengines]
648 compengines = sorted(compengines,
649 compengines = sorted(compengines,
649 key=lambda e: configengines.index(e.name()))
650 key=lambda e: configengines.index(e.name()))
650
651
651 if not compengines:
652 if not compengines:
652 raise error.Abort(_('%s config option does not specify any known '
653 raise error.Abort(_('%s config option does not specify any known '
653 'compression engines') % config,
654 'compression engines') % config,
654 hint=_('usable compression engines: %s') %
655 hint=_('usable compression engines: %s') %
655 ', '.sorted(validnames))
656 ', '.sorted(validnames))
656
657
657 return compengines
658 return compengines
658
659
659 # list of commands
660 # list of commands
660 commands = {}
661 commands = {}
661
662
662 def wireprotocommand(name, args=''):
663 def wireprotocommand(name, args=''):
663 """decorator for wire protocol command"""
664 """decorator for wire protocol command"""
664 def register(func):
665 def register(func):
665 commands[name] = (func, args)
666 commands[name] = (func, args)
666 return func
667 return func
667 return register
668 return register
668
669
669 @wireprotocommand('batch', 'cmds *')
670 @wireprotocommand('batch', 'cmds *')
670 def batch(repo, proto, cmds, others):
671 def batch(repo, proto, cmds, others):
671 repo = repo.filtered("served")
672 repo = repo.filtered("served")
672 res = []
673 res = []
673 for pair in cmds.split(';'):
674 for pair in cmds.split(';'):
674 op, args = pair.split(' ', 1)
675 op, args = pair.split(' ', 1)
675 vals = {}
676 vals = {}
676 for a in args.split(','):
677 for a in args.split(','):
677 if a:
678 if a:
678 n, v = a.split('=')
679 n, v = a.split('=')
679 vals[unescapearg(n)] = unescapearg(v)
680 vals[unescapearg(n)] = unescapearg(v)
680 func, spec = commands[op]
681 func, spec = commands[op]
681 if spec:
682 if spec:
682 keys = spec.split()
683 keys = spec.split()
683 data = {}
684 data = {}
684 for k in keys:
685 for k in keys:
685 if k == '*':
686 if k == '*':
686 star = {}
687 star = {}
687 for key in vals.keys():
688 for key in vals.keys():
688 if key not in keys:
689 if key not in keys:
689 star[key] = vals[key]
690 star[key] = vals[key]
690 data['*'] = star
691 data['*'] = star
691 else:
692 else:
692 data[k] = vals[k]
693 data[k] = vals[k]
693 result = func(repo, proto, *[data[k] for k in keys])
694 result = func(repo, proto, *[data[k] for k in keys])
694 else:
695 else:
695 result = func(repo, proto)
696 result = func(repo, proto)
696 if isinstance(result, ooberror):
697 if isinstance(result, ooberror):
697 return result
698 return result
698 res.append(escapearg(result))
699 res.append(escapearg(result))
699 return ';'.join(res)
700 return ';'.join(res)
700
701
701 @wireprotocommand('between', 'pairs')
702 @wireprotocommand('between', 'pairs')
702 def between(repo, proto, pairs):
703 def between(repo, proto, pairs):
703 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
704 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
704 r = []
705 r = []
705 for b in repo.between(pairs):
706 for b in repo.between(pairs):
706 r.append(encodelist(b) + "\n")
707 r.append(encodelist(b) + "\n")
707 return "".join(r)
708 return "".join(r)
708
709
709 @wireprotocommand('branchmap')
710 @wireprotocommand('branchmap')
710 def branchmap(repo, proto):
711 def branchmap(repo, proto):
711 branchmap = repo.branchmap()
712 branchmap = repo.branchmap()
712 heads = []
713 heads = []
713 for branch, nodes in branchmap.iteritems():
714 for branch, nodes in branchmap.iteritems():
714 branchname = urlreq.quote(encoding.fromlocal(branch))
715 branchname = urlreq.quote(encoding.fromlocal(branch))
715 branchnodes = encodelist(nodes)
716 branchnodes = encodelist(nodes)
716 heads.append('%s %s' % (branchname, branchnodes))
717 heads.append('%s %s' % (branchname, branchnodes))
717 return '\n'.join(heads)
718 return '\n'.join(heads)
718
719
719 @wireprotocommand('branches', 'nodes')
720 @wireprotocommand('branches', 'nodes')
720 def branches(repo, proto, nodes):
721 def branches(repo, proto, nodes):
721 nodes = decodelist(nodes)
722 nodes = decodelist(nodes)
722 r = []
723 r = []
723 for b in repo.branches(nodes):
724 for b in repo.branches(nodes):
724 r.append(encodelist(b) + "\n")
725 r.append(encodelist(b) + "\n")
725 return "".join(r)
726 return "".join(r)
726
727
727 @wireprotocommand('clonebundles', '')
728 @wireprotocommand('clonebundles', '')
728 def clonebundles(repo, proto):
729 def clonebundles(repo, proto):
729 """Server command for returning info for available bundles to seed clones.
730 """Server command for returning info for available bundles to seed clones.
730
731
731 Clients will parse this response and determine what bundle to fetch.
732 Clients will parse this response and determine what bundle to fetch.
732
733
733 Extensions may wrap this command to filter or dynamically emit data
734 Extensions may wrap this command to filter or dynamically emit data
734 depending on the request. e.g. you could advertise URLs for the closest
735 depending on the request. e.g. you could advertise URLs for the closest
735 data center given the client's IP address.
736 data center given the client's IP address.
736 """
737 """
737 return repo.vfs.tryread('clonebundles.manifest')
738 return repo.vfs.tryread('clonebundles.manifest')
738
739
739 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
740 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
740 'known', 'getbundle', 'unbundlehash', 'batch']
741 'known', 'getbundle', 'unbundlehash', 'batch']
741
742
742 def _capabilities(repo, proto):
743 def _capabilities(repo, proto):
743 """return a list of capabilities for a repo
744 """return a list of capabilities for a repo
744
745
745 This function exists to allow extensions to easily wrap capabilities
746 This function exists to allow extensions to easily wrap capabilities
746 computation
747 computation
747
748
748 - returns a lists: easy to alter
749 - returns a lists: easy to alter
749 - change done here will be propagated to both `capabilities` and `hello`
750 - change done here will be propagated to both `capabilities` and `hello`
750 command without any other action needed.
751 command without any other action needed.
751 """
752 """
752 # copy to prevent modification of the global list
753 # copy to prevent modification of the global list
753 caps = list(wireprotocaps)
754 caps = list(wireprotocaps)
754 if streamclone.allowservergeneration(repo):
755 if streamclone.allowservergeneration(repo):
755 if repo.ui.configbool('server', 'preferuncompressed'):
756 if repo.ui.configbool('server', 'preferuncompressed'):
756 caps.append('stream-preferred')
757 caps.append('stream-preferred')
757 requiredformats = repo.requirements & repo.supportedformats
758 requiredformats = repo.requirements & repo.supportedformats
758 # if our local revlogs are just revlogv1, add 'stream' cap
759 # if our local revlogs are just revlogv1, add 'stream' cap
759 if not requiredformats - {'revlogv1'}:
760 if not requiredformats - {'revlogv1'}:
760 caps.append('stream')
761 caps.append('stream')
761 # otherwise, add 'streamreqs' detailing our local revlog format
762 # otherwise, add 'streamreqs' detailing our local revlog format
762 else:
763 else:
763 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
764 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
764 if repo.ui.configbool('experimental', 'bundle2-advertise'):
765 if repo.ui.configbool('experimental', 'bundle2-advertise'):
765 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
766 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
766 caps.append('bundle2=' + urlreq.quote(capsblob))
767 caps.append('bundle2=' + urlreq.quote(capsblob))
767 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
768 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
768
769
769 if proto.name == 'http':
770 if proto.name == 'http':
770 caps.append('httpheader=%d' %
771 caps.append('httpheader=%d' %
771 repo.ui.configint('server', 'maxhttpheaderlen'))
772 repo.ui.configint('server', 'maxhttpheaderlen'))
772 if repo.ui.configbool('experimental', 'httppostargs'):
773 if repo.ui.configbool('experimental', 'httppostargs'):
773 caps.append('httppostargs')
774 caps.append('httppostargs')
774
775
775 # FUTURE advertise 0.2rx once support is implemented
776 # FUTURE advertise 0.2rx once support is implemented
776 # FUTURE advertise minrx and mintx after consulting config option
777 # FUTURE advertise minrx and mintx after consulting config option
777 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
778 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
778
779
779 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
780 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
780 if compengines:
781 if compengines:
781 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
782 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
782 for e in compengines)
783 for e in compengines)
783 caps.append('compression=%s' % comptypes)
784 caps.append('compression=%s' % comptypes)
784
785
785 return caps
786 return caps
786
787
787 # If you are writing an extension and consider wrapping this function. Wrap
788 # If you are writing an extension and consider wrapping this function. Wrap
788 # `_capabilities` instead.
789 # `_capabilities` instead.
789 @wireprotocommand('capabilities')
790 @wireprotocommand('capabilities')
790 def capabilities(repo, proto):
791 def capabilities(repo, proto):
791 return ' '.join(_capabilities(repo, proto))
792 return ' '.join(_capabilities(repo, proto))
792
793
793 @wireprotocommand('changegroup', 'roots')
794 @wireprotocommand('changegroup', 'roots')
794 def changegroup(repo, proto, roots):
795 def changegroup(repo, proto, roots):
795 nodes = decodelist(roots)
796 nodes = decodelist(roots)
796 cg = changegroupmod.changegroup(repo, nodes, 'serve')
797 cg = changegroupmod.changegroup(repo, nodes, 'serve')
797 return streamres(reader=cg, v1compressible=True)
798 return streamres(reader=cg, v1compressible=True)
798
799
799 @wireprotocommand('changegroupsubset', 'bases heads')
800 @wireprotocommand('changegroupsubset', 'bases heads')
800 def changegroupsubset(repo, proto, bases, heads):
801 def changegroupsubset(repo, proto, bases, heads):
801 bases = decodelist(bases)
802 bases = decodelist(bases)
802 heads = decodelist(heads)
803 heads = decodelist(heads)
803 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
804 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
804 return streamres(reader=cg, v1compressible=True)
805 return streamres(reader=cg, v1compressible=True)
805
806
806 @wireprotocommand('debugwireargs', 'one two *')
807 @wireprotocommand('debugwireargs', 'one two *')
807 def debugwireargs(repo, proto, one, two, others):
808 def debugwireargs(repo, proto, one, two, others):
808 # only accept optional args from the known set
809 # only accept optional args from the known set
809 opts = options('debugwireargs', ['three', 'four'], others)
810 opts = options('debugwireargs', ['three', 'four'], others)
810 return repo.debugwireargs(one, two, **opts)
811 return repo.debugwireargs(one, two, **opts)
811
812
812 @wireprotocommand('getbundle', '*')
813 @wireprotocommand('getbundle', '*')
813 def getbundle(repo, proto, others):
814 def getbundle(repo, proto, others):
814 opts = options('getbundle', gboptsmap.keys(), others)
815 opts = options('getbundle', gboptsmap.keys(), others)
815 for k, v in opts.iteritems():
816 for k, v in opts.iteritems():
816 keytype = gboptsmap[k]
817 keytype = gboptsmap[k]
817 if keytype == 'nodes':
818 if keytype == 'nodes':
818 opts[k] = decodelist(v)
819 opts[k] = decodelist(v)
819 elif keytype == 'csv':
820 elif keytype == 'csv':
820 opts[k] = list(v.split(','))
821 opts[k] = list(v.split(','))
821 elif keytype == 'scsv':
822 elif keytype == 'scsv':
822 opts[k] = set(v.split(','))
823 opts[k] = set(v.split(','))
823 elif keytype == 'boolean':
824 elif keytype == 'boolean':
824 # Client should serialize False as '0', which is a non-empty string
825 # Client should serialize False as '0', which is a non-empty string
825 # so it evaluates as a True bool.
826 # so it evaluates as a True bool.
826 if v == '0':
827 if v == '0':
827 opts[k] = False
828 opts[k] = False
828 else:
829 else:
829 opts[k] = bool(v)
830 opts[k] = bool(v)
830 elif keytype != 'plain':
831 elif keytype != 'plain':
831 raise KeyError('unknown getbundle option type %s'
832 raise KeyError('unknown getbundle option type %s'
832 % keytype)
833 % keytype)
833
834
834 if not bundle1allowed(repo, 'pull'):
835 if not bundle1allowed(repo, 'pull'):
835 if not exchange.bundle2requested(opts.get('bundlecaps')):
836 if not exchange.bundle2requested(opts.get('bundlecaps')):
836 if proto.name == 'http':
837 if proto.name == 'http':
837 return ooberror(bundle2required)
838 return ooberror(bundle2required)
838 raise error.Abort(bundle2requiredmain,
839 raise error.Abort(bundle2requiredmain,
839 hint=bundle2requiredhint)
840 hint=bundle2requiredhint)
840
841
841 try:
842 try:
842 if repo.ui.configbool('server', 'disablefullbundle'):
843 if repo.ui.configbool('server', 'disablefullbundle'):
843 # Check to see if this is a full clone.
844 # Check to see if this is a full clone.
844 clheads = set(repo.changelog.heads())
845 clheads = set(repo.changelog.heads())
845 heads = set(opts.get('heads', set()))
846 heads = set(opts.get('heads', set()))
846 common = set(opts.get('common', set()))
847 common = set(opts.get('common', set()))
847 common.discard(nullid)
848 common.discard(nullid)
848 if not common and clheads == heads:
849 if not common and clheads == heads:
849 raise error.Abort(
850 raise error.Abort(
850 _('server has pull-based clones disabled'),
851 _('server has pull-based clones disabled'),
851 hint=_('remove --pull if specified or upgrade Mercurial'))
852 hint=_('remove --pull if specified or upgrade Mercurial'))
852
853
853 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
854 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
854 except error.Abort as exc:
855 except error.Abort as exc:
855 # cleanly forward Abort error to the client
856 # cleanly forward Abort error to the client
856 if not exchange.bundle2requested(opts.get('bundlecaps')):
857 if not exchange.bundle2requested(opts.get('bundlecaps')):
857 if proto.name == 'http':
858 if proto.name == 'http':
858 return ooberror(str(exc) + '\n')
859 return ooberror(str(exc) + '\n')
859 raise # cannot do better for bundle1 + ssh
860 raise # cannot do better for bundle1 + ssh
860 # bundle2 request expect a bundle2 reply
861 # bundle2 request expect a bundle2 reply
861 bundler = bundle2.bundle20(repo.ui)
862 bundler = bundle2.bundle20(repo.ui)
862 manargs = [('message', str(exc))]
863 manargs = [('message', str(exc))]
863 advargs = []
864 advargs = []
864 if exc.hint is not None:
865 if exc.hint is not None:
865 advargs.append(('hint', exc.hint))
866 advargs.append(('hint', exc.hint))
866 bundler.addpart(bundle2.bundlepart('error:abort',
867 bundler.addpart(bundle2.bundlepart('error:abort',
867 manargs, advargs))
868 manargs, advargs))
868 return streamres(gen=bundler.getchunks(), v1compressible=True)
869 return streamres(gen=bundler.getchunks(), v1compressible=True)
869 return streamres(gen=chunks, v1compressible=True)
870 return streamres(gen=chunks, v1compressible=True)
870
871
871 @wireprotocommand('heads')
872 @wireprotocommand('heads')
872 def heads(repo, proto):
873 def heads(repo, proto):
873 h = repo.heads()
874 h = repo.heads()
874 return encodelist(h) + "\n"
875 return encodelist(h) + "\n"
875
876
876 @wireprotocommand('hello')
877 @wireprotocommand('hello')
877 def hello(repo, proto):
878 def hello(repo, proto):
878 '''the hello command returns a set of lines describing various
879 '''the hello command returns a set of lines describing various
879 interesting things about the server, in an RFC822-like format.
880 interesting things about the server, in an RFC822-like format.
880 Currently the only one defined is "capabilities", which
881 Currently the only one defined is "capabilities", which
881 consists of a line in the form:
882 consists of a line in the form:
882
883
883 capabilities: space separated list of tokens
884 capabilities: space separated list of tokens
884 '''
885 '''
885 return "capabilities: %s\n" % (capabilities(repo, proto))
886 return "capabilities: %s\n" % (capabilities(repo, proto))
886
887
887 @wireprotocommand('listkeys', 'namespace')
888 @wireprotocommand('listkeys', 'namespace')
888 def listkeys(repo, proto, namespace):
889 def listkeys(repo, proto, namespace):
889 d = repo.listkeys(encoding.tolocal(namespace)).items()
890 d = repo.listkeys(encoding.tolocal(namespace)).items()
890 return pushkeymod.encodekeys(d)
891 return pushkeymod.encodekeys(d)
891
892
892 @wireprotocommand('lookup', 'key')
893 @wireprotocommand('lookup', 'key')
893 def lookup(repo, proto, key):
894 def lookup(repo, proto, key):
894 try:
895 try:
895 k = encoding.tolocal(key)
896 k = encoding.tolocal(key)
896 c = repo[k]
897 c = repo[k]
897 r = c.hex()
898 r = c.hex()
898 success = 1
899 success = 1
899 except Exception as inst:
900 except Exception as inst:
900 r = str(inst)
901 r = str(inst)
901 success = 0
902 success = 0
902 return "%s %s\n" % (success, r)
903 return "%s %s\n" % (success, r)
903
904
904 @wireprotocommand('known', 'nodes *')
905 @wireprotocommand('known', 'nodes *')
905 def known(repo, proto, nodes, others):
906 def known(repo, proto, nodes, others):
906 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
907 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
907
908
908 @wireprotocommand('pushkey', 'namespace key old new')
909 @wireprotocommand('pushkey', 'namespace key old new')
909 def pushkey(repo, proto, namespace, key, old, new):
910 def pushkey(repo, proto, namespace, key, old, new):
910 # compatibility with pre-1.8 clients which were accidentally
911 # compatibility with pre-1.8 clients which were accidentally
911 # sending raw binary nodes rather than utf-8-encoded hex
912 # sending raw binary nodes rather than utf-8-encoded hex
912 if len(new) == 20 and util.escapestr(new) != new:
913 if len(new) == 20 and util.escapestr(new) != new:
913 # looks like it could be a binary node
914 # looks like it could be a binary node
914 try:
915 try:
915 new.decode('utf-8')
916 new.decode('utf-8')
916 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
917 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
917 except UnicodeDecodeError:
918 except UnicodeDecodeError:
918 pass # binary, leave unmodified
919 pass # binary, leave unmodified
919 else:
920 else:
920 new = encoding.tolocal(new) # normal path
921 new = encoding.tolocal(new) # normal path
921
922
922 if util.safehasattr(proto, 'restore'):
923 if util.safehasattr(proto, 'restore'):
923
924
924 proto.redirect()
925 proto.redirect()
925
926
926 try:
927 try:
927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
928 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
928 encoding.tolocal(old), new) or False
929 encoding.tolocal(old), new) or False
929 except error.Abort:
930 except error.Abort:
930 r = False
931 r = False
931
932
932 output = proto.restore()
933 output = proto.restore()
933
934
934 return '%s\n%s' % (int(r), output)
935 return '%s\n%s' % (int(r), output)
935
936
936 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
937 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
937 encoding.tolocal(old), new)
938 encoding.tolocal(old), new)
938 return '%s\n' % int(r)
939 return '%s\n' % int(r)
939
940
940 @wireprotocommand('stream_out')
941 @wireprotocommand('stream_out')
941 def stream(repo, proto):
942 def stream(repo, proto):
942 '''If the server supports streaming clone, it advertises the "stream"
943 '''If the server supports streaming clone, it advertises the "stream"
943 capability with a value representing the version and flags of the repo
944 capability with a value representing the version and flags of the repo
944 it is serving. Client checks to see if it understands the format.
945 it is serving. Client checks to see if it understands the format.
945 '''
946 '''
946 if not streamclone.allowservergeneration(repo):
947 if not streamclone.allowservergeneration(repo):
947 return '1\n'
948 return '1\n'
948
949
949 def getstream(it):
950 def getstream(it):
950 yield '0\n'
951 yield '0\n'
951 for chunk in it:
952 for chunk in it:
952 yield chunk
953 yield chunk
953
954
954 try:
955 try:
955 # LockError may be raised before the first result is yielded. Don't
956 # LockError may be raised before the first result is yielded. Don't
956 # emit output until we're sure we got the lock successfully.
957 # emit output until we're sure we got the lock successfully.
957 it = streamclone.generatev1wireproto(repo)
958 it = streamclone.generatev1wireproto(repo)
958 return streamres(gen=getstream(it))
959 return streamres(gen=getstream(it))
959 except error.LockError:
960 except error.LockError:
960 return '2\n'
961 return '2\n'
961
962
962 @wireprotocommand('unbundle', 'heads')
963 @wireprotocommand('unbundle', 'heads')
963 def unbundle(repo, proto, heads):
964 def unbundle(repo, proto, heads):
964 their_heads = decodelist(heads)
965 their_heads = decodelist(heads)
965
966
966 try:
967 try:
967 proto.redirect()
968 proto.redirect()
968
969
969 exchange.check_heads(repo, their_heads, 'preparing changes')
970 exchange.check_heads(repo, their_heads, 'preparing changes')
970
971
971 # write bundle data to temporary file because it can be big
972 # write bundle data to temporary file because it can be big
972 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
973 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
973 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
974 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
974 r = 0
975 r = 0
975 try:
976 try:
976 proto.getfile(fp)
977 proto.getfile(fp)
977 fp.seek(0)
978 fp.seek(0)
978 gen = exchange.readbundle(repo.ui, fp, None)
979 gen = exchange.readbundle(repo.ui, fp, None)
979 if (isinstance(gen, changegroupmod.cg1unpacker)
980 if (isinstance(gen, changegroupmod.cg1unpacker)
980 and not bundle1allowed(repo, 'push')):
981 and not bundle1allowed(repo, 'push')):
981 if proto.name == 'http':
982 if proto.name == 'http':
982 # need to special case http because stderr do not get to
983 # need to special case http because stderr do not get to
983 # the http client on failed push so we need to abuse some
984 # the http client on failed push so we need to abuse some
984 # other error type to make sure the message get to the
985 # other error type to make sure the message get to the
985 # user.
986 # user.
986 return ooberror(bundle2required)
987 return ooberror(bundle2required)
987 raise error.Abort(bundle2requiredmain,
988 raise error.Abort(bundle2requiredmain,
988 hint=bundle2requiredhint)
989 hint=bundle2requiredhint)
989
990
990 r = exchange.unbundle(repo, gen, their_heads, 'serve',
991 r = exchange.unbundle(repo, gen, their_heads, 'serve',
991 proto._client())
992 proto._client())
992 if util.safehasattr(r, 'addpart'):
993 if util.safehasattr(r, 'addpart'):
993 # The return looks streamable, we are in the bundle2 case and
994 # The return looks streamable, we are in the bundle2 case and
994 # should return a stream.
995 # should return a stream.
995 return streamres(gen=r.getchunks())
996 return streamres(gen=r.getchunks())
996 return pushres(r)
997 return pushres(r)
997
998
998 finally:
999 finally:
999 fp.close()
1000 fp.close()
1000 os.unlink(tempname)
1001 os.unlink(tempname)
1001
1002
1002 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1003 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1003 # handle non-bundle2 case first
1004 # handle non-bundle2 case first
1004 if not getattr(exc, 'duringunbundle2', False):
1005 if not getattr(exc, 'duringunbundle2', False):
1005 try:
1006 try:
1006 raise
1007 raise
1007 except error.Abort:
1008 except error.Abort:
1008 # The old code we moved used util.stderr directly.
1009 # The old code we moved used util.stderr directly.
1009 # We did not change it to minimise code change.
1010 # We did not change it to minimise code change.
1010 # This need to be moved to something proper.
1011 # This need to be moved to something proper.
1011 # Feel free to do it.
1012 # Feel free to do it.
1012 util.stderr.write("abort: %s\n" % exc)
1013 util.stderr.write("abort: %s\n" % exc)
1013 if exc.hint is not None:
1014 if exc.hint is not None:
1014 util.stderr.write("(%s)\n" % exc.hint)
1015 util.stderr.write("(%s)\n" % exc.hint)
1015 return pushres(0)
1016 return pushres(0)
1016 except error.PushRaced:
1017 except error.PushRaced:
1017 return pusherr(str(exc))
1018 return pusherr(str(exc))
1018
1019
1019 bundler = bundle2.bundle20(repo.ui)
1020 bundler = bundle2.bundle20(repo.ui)
1020 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1021 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1021 bundler.addpart(out)
1022 bundler.addpart(out)
1022 try:
1023 try:
1023 try:
1024 try:
1024 raise
1025 raise
1025 except error.PushkeyFailed as exc:
1026 except error.PushkeyFailed as exc:
1026 # check client caps
1027 # check client caps
1027 remotecaps = getattr(exc, '_replycaps', None)
1028 remotecaps = getattr(exc, '_replycaps', None)
1028 if (remotecaps is not None
1029 if (remotecaps is not None
1029 and 'pushkey' not in remotecaps.get('error', ())):
1030 and 'pushkey' not in remotecaps.get('error', ())):
1030 # no support remote side, fallback to Abort handler.
1031 # no support remote side, fallback to Abort handler.
1031 raise
1032 raise
1032 part = bundler.newpart('error:pushkey')
1033 part = bundler.newpart('error:pushkey')
1033 part.addparam('in-reply-to', exc.partid)
1034 part.addparam('in-reply-to', exc.partid)
1034 if exc.namespace is not None:
1035 if exc.namespace is not None:
1035 part.addparam('namespace', exc.namespace, mandatory=False)
1036 part.addparam('namespace', exc.namespace, mandatory=False)
1036 if exc.key is not None:
1037 if exc.key is not None:
1037 part.addparam('key', exc.key, mandatory=False)
1038 part.addparam('key', exc.key, mandatory=False)
1038 if exc.new is not None:
1039 if exc.new is not None:
1039 part.addparam('new', exc.new, mandatory=False)
1040 part.addparam('new', exc.new, mandatory=False)
1040 if exc.old is not None:
1041 if exc.old is not None:
1041 part.addparam('old', exc.old, mandatory=False)
1042 part.addparam('old', exc.old, mandatory=False)
1042 if exc.ret is not None:
1043 if exc.ret is not None:
1043 part.addparam('ret', exc.ret, mandatory=False)
1044 part.addparam('ret', exc.ret, mandatory=False)
1044 except error.BundleValueError as exc:
1045 except error.BundleValueError as exc:
1045 errpart = bundler.newpart('error:unsupportedcontent')
1046 errpart = bundler.newpart('error:unsupportedcontent')
1046 if exc.parttype is not None:
1047 if exc.parttype is not None:
1047 errpart.addparam('parttype', exc.parttype)
1048 errpart.addparam('parttype', exc.parttype)
1048 if exc.params:
1049 if exc.params:
1049 errpart.addparam('params', '\0'.join(exc.params))
1050 errpart.addparam('params', '\0'.join(exc.params))
1050 except error.Abort as exc:
1051 except error.Abort as exc:
1051 manargs = [('message', str(exc))]
1052 manargs = [('message', str(exc))]
1052 advargs = []
1053 advargs = []
1053 if exc.hint is not None:
1054 if exc.hint is not None:
1054 advargs.append(('hint', exc.hint))
1055 advargs.append(('hint', exc.hint))
1055 bundler.addpart(bundle2.bundlepart('error:abort',
1056 bundler.addpart(bundle2.bundlepart('error:abort',
1056 manargs, advargs))
1057 manargs, advargs))
1057 except error.PushRaced as exc:
1058 except error.PushRaced as exc:
1058 bundler.newpart('error:pushraced', [('message', str(exc))])
1059 bundler.newpart('error:pushraced', [('message', str(exc))])
1059 return streamres(gen=bundler.getchunks())
1060 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now