##// END OF EJS Templates
wireproto: fix lingering str(exception) with util.forcebytestr(exception)...
Augie Fackler -
r36332:be9c497e default
parent child Browse files
Show More
@@ -1,1066 +1,1067 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 wireprototypes,
34 wireprototypes,
35 )
35 )
36
36
37 urlerr = util.urlerr
37 urlerr = util.urlerr
38 urlreq = util.urlreq
38 urlreq = util.urlreq
39
39
40 bytesresponse = wireprototypes.bytesresponse
40 bytesresponse = wireprototypes.bytesresponse
41 ooberror = wireprototypes.ooberror
41 ooberror = wireprototypes.ooberror
42 pushres = wireprototypes.pushres
42 pushres = wireprototypes.pushres
43 pusherr = wireprototypes.pusherr
43 pusherr = wireprototypes.pusherr
44 streamres = wireprototypes.streamres
44 streamres = wireprototypes.streamres
45 streamres_legacy = wireprototypes.streamreslegacy
45 streamres_legacy = wireprototypes.streamreslegacy
46
46
47 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
47 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
48 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
48 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
49 'IncompatibleClient')
49 'IncompatibleClient')
50 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
50 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
51
51
52 class remoteiterbatcher(peer.iterbatcher):
52 class remoteiterbatcher(peer.iterbatcher):
53 def __init__(self, remote):
53 def __init__(self, remote):
54 super(remoteiterbatcher, self).__init__()
54 super(remoteiterbatcher, self).__init__()
55 self._remote = remote
55 self._remote = remote
56
56
57 def __getattr__(self, name):
57 def __getattr__(self, name):
58 # Validate this method is batchable, since submit() only supports
58 # Validate this method is batchable, since submit() only supports
59 # batchable methods.
59 # batchable methods.
60 fn = getattr(self._remote, name)
60 fn = getattr(self._remote, name)
61 if not getattr(fn, 'batchable', None):
61 if not getattr(fn, 'batchable', None):
62 raise error.ProgrammingError('Attempted to batch a non-batchable '
62 raise error.ProgrammingError('Attempted to batch a non-batchable '
63 'call to %r' % name)
63 'call to %r' % name)
64
64
65 return super(remoteiterbatcher, self).__getattr__(name)
65 return super(remoteiterbatcher, self).__getattr__(name)
66
66
67 def submit(self):
67 def submit(self):
68 """Break the batch request into many patch calls and pipeline them.
68 """Break the batch request into many patch calls and pipeline them.
69
69
70 This is mostly valuable over http where request sizes can be
70 This is mostly valuable over http where request sizes can be
71 limited, but can be used in other places as well.
71 limited, but can be used in other places as well.
72 """
72 """
73 # 2-tuple of (command, arguments) that represents what will be
73 # 2-tuple of (command, arguments) that represents what will be
74 # sent over the wire.
74 # sent over the wire.
75 requests = []
75 requests = []
76
76
77 # 4-tuple of (command, final future, @batchable generator, remote
77 # 4-tuple of (command, final future, @batchable generator, remote
78 # future).
78 # future).
79 results = []
79 results = []
80
80
81 for command, args, opts, finalfuture in self.calls:
81 for command, args, opts, finalfuture in self.calls:
82 mtd = getattr(self._remote, command)
82 mtd = getattr(self._remote, command)
83 batchable = mtd.batchable(mtd.__self__, *args, **opts)
83 batchable = mtd.batchable(mtd.__self__, *args, **opts)
84
84
85 commandargs, fremote = next(batchable)
85 commandargs, fremote = next(batchable)
86 assert fremote
86 assert fremote
87 requests.append((command, commandargs))
87 requests.append((command, commandargs))
88 results.append((command, finalfuture, batchable, fremote))
88 results.append((command, finalfuture, batchable, fremote))
89
89
90 if requests:
90 if requests:
91 self._resultiter = self._remote._submitbatch(requests)
91 self._resultiter = self._remote._submitbatch(requests)
92
92
93 self._results = results
93 self._results = results
94
94
95 def results(self):
95 def results(self):
96 for command, finalfuture, batchable, remotefuture in self._results:
96 for command, finalfuture, batchable, remotefuture in self._results:
97 # Get the raw result, set it in the remote future, feed it
97 # Get the raw result, set it in the remote future, feed it
98 # back into the @batchable generator so it can be decoded, and
98 # back into the @batchable generator so it can be decoded, and
99 # set the result on the final future to this value.
99 # set the result on the final future to this value.
100 remoteresult = next(self._resultiter)
100 remoteresult = next(self._resultiter)
101 remotefuture.set(remoteresult)
101 remotefuture.set(remoteresult)
102 finalfuture.set(next(batchable))
102 finalfuture.set(next(batchable))
103
103
104 # Verify our @batchable generators only emit 2 values.
104 # Verify our @batchable generators only emit 2 values.
105 try:
105 try:
106 next(batchable)
106 next(batchable)
107 except StopIteration:
107 except StopIteration:
108 pass
108 pass
109 else:
109 else:
110 raise error.ProgrammingError('%s @batchable generator emitted '
110 raise error.ProgrammingError('%s @batchable generator emitted '
111 'unexpected value count' % command)
111 'unexpected value count' % command)
112
112
113 yield finalfuture.value
113 yield finalfuture.value
114
114
115 # Forward a couple of names from peer to make wireproto interactions
115 # Forward a couple of names from peer to make wireproto interactions
116 # slightly more sensible.
116 # slightly more sensible.
117 batchable = peer.batchable
117 batchable = peer.batchable
118 future = peer.future
118 future = peer.future
119
119
120 # list of nodes encoding / decoding
120 # list of nodes encoding / decoding
121
121
122 def decodelist(l, sep=' '):
122 def decodelist(l, sep=' '):
123 if l:
123 if l:
124 return [bin(v) for v in l.split(sep)]
124 return [bin(v) for v in l.split(sep)]
125 return []
125 return []
126
126
127 def encodelist(l, sep=' '):
127 def encodelist(l, sep=' '):
128 try:
128 try:
129 return sep.join(map(hex, l))
129 return sep.join(map(hex, l))
130 except TypeError:
130 except TypeError:
131 raise
131 raise
132
132
133 # batched call argument encoding
133 # batched call argument encoding
134
134
135 def escapearg(plain):
135 def escapearg(plain):
136 return (plain
136 return (plain
137 .replace(':', ':c')
137 .replace(':', ':c')
138 .replace(',', ':o')
138 .replace(',', ':o')
139 .replace(';', ':s')
139 .replace(';', ':s')
140 .replace('=', ':e'))
140 .replace('=', ':e'))
141
141
142 def unescapearg(escaped):
142 def unescapearg(escaped):
143 return (escaped
143 return (escaped
144 .replace(':e', '=')
144 .replace(':e', '=')
145 .replace(':s', ';')
145 .replace(':s', ';')
146 .replace(':o', ',')
146 .replace(':o', ',')
147 .replace(':c', ':'))
147 .replace(':c', ':'))
148
148
149 def encodebatchcmds(req):
149 def encodebatchcmds(req):
150 """Return a ``cmds`` argument value for the ``batch`` command."""
150 """Return a ``cmds`` argument value for the ``batch`` command."""
151 cmds = []
151 cmds = []
152 for op, argsdict in req:
152 for op, argsdict in req:
153 # Old servers didn't properly unescape argument names. So prevent
153 # Old servers didn't properly unescape argument names. So prevent
154 # the sending of argument names that may not be decoded properly by
154 # the sending of argument names that may not be decoded properly by
155 # servers.
155 # servers.
156 assert all(escapearg(k) == k for k in argsdict)
156 assert all(escapearg(k) == k for k in argsdict)
157
157
158 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
158 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
159 for k, v in argsdict.iteritems())
159 for k, v in argsdict.iteritems())
160 cmds.append('%s %s' % (op, args))
160 cmds.append('%s %s' % (op, args))
161
161
162 return ';'.join(cmds)
162 return ';'.join(cmds)
163
163
164 # mapping of options accepted by getbundle and their types
164 # mapping of options accepted by getbundle and their types
165 #
165 #
166 # Meant to be extended by extensions. It is extensions responsibility to ensure
166 # Meant to be extended by extensions. It is extensions responsibility to ensure
167 # such options are properly processed in exchange.getbundle.
167 # such options are properly processed in exchange.getbundle.
168 #
168 #
169 # supported types are:
169 # supported types are:
170 #
170 #
171 # :nodes: list of binary nodes
171 # :nodes: list of binary nodes
172 # :csv: list of comma-separated values
172 # :csv: list of comma-separated values
173 # :scsv: list of comma-separated values return as set
173 # :scsv: list of comma-separated values return as set
174 # :plain: string with no transformation needed.
174 # :plain: string with no transformation needed.
175 gboptsmap = {'heads': 'nodes',
175 gboptsmap = {'heads': 'nodes',
176 'bookmarks': 'boolean',
176 'bookmarks': 'boolean',
177 'common': 'nodes',
177 'common': 'nodes',
178 'obsmarkers': 'boolean',
178 'obsmarkers': 'boolean',
179 'phases': 'boolean',
179 'phases': 'boolean',
180 'bundlecaps': 'scsv',
180 'bundlecaps': 'scsv',
181 'listkeys': 'csv',
181 'listkeys': 'csv',
182 'cg': 'boolean',
182 'cg': 'boolean',
183 'cbattempted': 'boolean',
183 'cbattempted': 'boolean',
184 'stream': 'boolean',
184 'stream': 'boolean',
185 }
185 }
186
186
187 # client side
187 # client side
188
188
189 class wirepeer(repository.legacypeer):
189 class wirepeer(repository.legacypeer):
190 """Client-side interface for communicating with a peer repository.
190 """Client-side interface for communicating with a peer repository.
191
191
192 Methods commonly call wire protocol commands of the same name.
192 Methods commonly call wire protocol commands of the same name.
193
193
194 See also httppeer.py and sshpeer.py for protocol-specific
194 See also httppeer.py and sshpeer.py for protocol-specific
195 implementations of this interface.
195 implementations of this interface.
196 """
196 """
197 # Begin of basewirepeer interface.
197 # Begin of basewirepeer interface.
198
198
199 def iterbatch(self):
199 def iterbatch(self):
200 return remoteiterbatcher(self)
200 return remoteiterbatcher(self)
201
201
202 @batchable
202 @batchable
203 def lookup(self, key):
203 def lookup(self, key):
204 self.requirecap('lookup', _('look up remote revision'))
204 self.requirecap('lookup', _('look up remote revision'))
205 f = future()
205 f = future()
206 yield {'key': encoding.fromlocal(key)}, f
206 yield {'key': encoding.fromlocal(key)}, f
207 d = f.value
207 d = f.value
208 success, data = d[:-1].split(" ", 1)
208 success, data = d[:-1].split(" ", 1)
209 if int(success):
209 if int(success):
210 yield bin(data)
210 yield bin(data)
211 else:
211 else:
212 self._abort(error.RepoError(data))
212 self._abort(error.RepoError(data))
213
213
214 @batchable
214 @batchable
215 def heads(self):
215 def heads(self):
216 f = future()
216 f = future()
217 yield {}, f
217 yield {}, f
218 d = f.value
218 d = f.value
219 try:
219 try:
220 yield decodelist(d[:-1])
220 yield decodelist(d[:-1])
221 except ValueError:
221 except ValueError:
222 self._abort(error.ResponseError(_("unexpected response:"), d))
222 self._abort(error.ResponseError(_("unexpected response:"), d))
223
223
224 @batchable
224 @batchable
225 def known(self, nodes):
225 def known(self, nodes):
226 f = future()
226 f = future()
227 yield {'nodes': encodelist(nodes)}, f
227 yield {'nodes': encodelist(nodes)}, f
228 d = f.value
228 d = f.value
229 try:
229 try:
230 yield [bool(int(b)) for b in d]
230 yield [bool(int(b)) for b in d]
231 except ValueError:
231 except ValueError:
232 self._abort(error.ResponseError(_("unexpected response:"), d))
232 self._abort(error.ResponseError(_("unexpected response:"), d))
233
233
234 @batchable
234 @batchable
235 def branchmap(self):
235 def branchmap(self):
236 f = future()
236 f = future()
237 yield {}, f
237 yield {}, f
238 d = f.value
238 d = f.value
239 try:
239 try:
240 branchmap = {}
240 branchmap = {}
241 for branchpart in d.splitlines():
241 for branchpart in d.splitlines():
242 branchname, branchheads = branchpart.split(' ', 1)
242 branchname, branchheads = branchpart.split(' ', 1)
243 branchname = encoding.tolocal(urlreq.unquote(branchname))
243 branchname = encoding.tolocal(urlreq.unquote(branchname))
244 branchheads = decodelist(branchheads)
244 branchheads = decodelist(branchheads)
245 branchmap[branchname] = branchheads
245 branchmap[branchname] = branchheads
246 yield branchmap
246 yield branchmap
247 except TypeError:
247 except TypeError:
248 self._abort(error.ResponseError(_("unexpected response:"), d))
248 self._abort(error.ResponseError(_("unexpected response:"), d))
249
249
250 @batchable
250 @batchable
251 def listkeys(self, namespace):
251 def listkeys(self, namespace):
252 if not self.capable('pushkey'):
252 if not self.capable('pushkey'):
253 yield {}, None
253 yield {}, None
254 f = future()
254 f = future()
255 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
255 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
256 yield {'namespace': encoding.fromlocal(namespace)}, f
256 yield {'namespace': encoding.fromlocal(namespace)}, f
257 d = f.value
257 d = f.value
258 self.ui.debug('received listkey for "%s": %i bytes\n'
258 self.ui.debug('received listkey for "%s": %i bytes\n'
259 % (namespace, len(d)))
259 % (namespace, len(d)))
260 yield pushkeymod.decodekeys(d)
260 yield pushkeymod.decodekeys(d)
261
261
262 @batchable
262 @batchable
263 def pushkey(self, namespace, key, old, new):
263 def pushkey(self, namespace, key, old, new):
264 if not self.capable('pushkey'):
264 if not self.capable('pushkey'):
265 yield False, None
265 yield False, None
266 f = future()
266 f = future()
267 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
267 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
268 yield {'namespace': encoding.fromlocal(namespace),
268 yield {'namespace': encoding.fromlocal(namespace),
269 'key': encoding.fromlocal(key),
269 'key': encoding.fromlocal(key),
270 'old': encoding.fromlocal(old),
270 'old': encoding.fromlocal(old),
271 'new': encoding.fromlocal(new)}, f
271 'new': encoding.fromlocal(new)}, f
272 d = f.value
272 d = f.value
273 d, output = d.split('\n', 1)
273 d, output = d.split('\n', 1)
274 try:
274 try:
275 d = bool(int(d))
275 d = bool(int(d))
276 except ValueError:
276 except ValueError:
277 raise error.ResponseError(
277 raise error.ResponseError(
278 _('push failed (unexpected response):'), d)
278 _('push failed (unexpected response):'), d)
279 for l in output.splitlines(True):
279 for l in output.splitlines(True):
280 self.ui.status(_('remote: '), l)
280 self.ui.status(_('remote: '), l)
281 yield d
281 yield d
282
282
283 def stream_out(self):
283 def stream_out(self):
284 return self._callstream('stream_out')
284 return self._callstream('stream_out')
285
285
286 def getbundle(self, source, **kwargs):
286 def getbundle(self, source, **kwargs):
287 kwargs = pycompat.byteskwargs(kwargs)
287 kwargs = pycompat.byteskwargs(kwargs)
288 self.requirecap('getbundle', _('look up remote changes'))
288 self.requirecap('getbundle', _('look up remote changes'))
289 opts = {}
289 opts = {}
290 bundlecaps = kwargs.get('bundlecaps')
290 bundlecaps = kwargs.get('bundlecaps')
291 if bundlecaps is not None:
291 if bundlecaps is not None:
292 kwargs['bundlecaps'] = sorted(bundlecaps)
292 kwargs['bundlecaps'] = sorted(bundlecaps)
293 else:
293 else:
294 bundlecaps = () # kwargs could have it to None
294 bundlecaps = () # kwargs could have it to None
295 for key, value in kwargs.iteritems():
295 for key, value in kwargs.iteritems():
296 if value is None:
296 if value is None:
297 continue
297 continue
298 keytype = gboptsmap.get(key)
298 keytype = gboptsmap.get(key)
299 if keytype is None:
299 if keytype is None:
300 raise error.ProgrammingError(
300 raise error.ProgrammingError(
301 'Unexpectedly None keytype for key %s' % key)
301 'Unexpectedly None keytype for key %s' % key)
302 elif keytype == 'nodes':
302 elif keytype == 'nodes':
303 value = encodelist(value)
303 value = encodelist(value)
304 elif keytype in ('csv', 'scsv'):
304 elif keytype in ('csv', 'scsv'):
305 value = ','.join(value)
305 value = ','.join(value)
306 elif keytype == 'boolean':
306 elif keytype == 'boolean':
307 value = '%i' % bool(value)
307 value = '%i' % bool(value)
308 elif keytype != 'plain':
308 elif keytype != 'plain':
309 raise KeyError('unknown getbundle option type %s'
309 raise KeyError('unknown getbundle option type %s'
310 % keytype)
310 % keytype)
311 opts[key] = value
311 opts[key] = value
312 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
312 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
313 if any((cap.startswith('HG2') for cap in bundlecaps)):
313 if any((cap.startswith('HG2') for cap in bundlecaps)):
314 return bundle2.getunbundler(self.ui, f)
314 return bundle2.getunbundler(self.ui, f)
315 else:
315 else:
316 return changegroupmod.cg1unpacker(f, 'UN')
316 return changegroupmod.cg1unpacker(f, 'UN')
317
317
318 def unbundle(self, cg, heads, url):
318 def unbundle(self, cg, heads, url):
319 '''Send cg (a readable file-like object representing the
319 '''Send cg (a readable file-like object representing the
320 changegroup to push, typically a chunkbuffer object) to the
320 changegroup to push, typically a chunkbuffer object) to the
321 remote server as a bundle.
321 remote server as a bundle.
322
322
323 When pushing a bundle10 stream, return an integer indicating the
323 When pushing a bundle10 stream, return an integer indicating the
324 result of the push (see changegroup.apply()).
324 result of the push (see changegroup.apply()).
325
325
326 When pushing a bundle20 stream, return a bundle20 stream.
326 When pushing a bundle20 stream, return a bundle20 stream.
327
327
328 `url` is the url the client thinks it's pushing to, which is
328 `url` is the url the client thinks it's pushing to, which is
329 visible to hooks.
329 visible to hooks.
330 '''
330 '''
331
331
332 if heads != ['force'] and self.capable('unbundlehash'):
332 if heads != ['force'] and self.capable('unbundlehash'):
333 heads = encodelist(['hashed',
333 heads = encodelist(['hashed',
334 hashlib.sha1(''.join(sorted(heads))).digest()])
334 hashlib.sha1(''.join(sorted(heads))).digest()])
335 else:
335 else:
336 heads = encodelist(heads)
336 heads = encodelist(heads)
337
337
338 if util.safehasattr(cg, 'deltaheader'):
338 if util.safehasattr(cg, 'deltaheader'):
339 # this a bundle10, do the old style call sequence
339 # this a bundle10, do the old style call sequence
340 ret, output = self._callpush("unbundle", cg, heads=heads)
340 ret, output = self._callpush("unbundle", cg, heads=heads)
341 if ret == "":
341 if ret == "":
342 raise error.ResponseError(
342 raise error.ResponseError(
343 _('push failed:'), output)
343 _('push failed:'), output)
344 try:
344 try:
345 ret = int(ret)
345 ret = int(ret)
346 except ValueError:
346 except ValueError:
347 raise error.ResponseError(
347 raise error.ResponseError(
348 _('push failed (unexpected response):'), ret)
348 _('push failed (unexpected response):'), ret)
349
349
350 for l in output.splitlines(True):
350 for l in output.splitlines(True):
351 self.ui.status(_('remote: '), l)
351 self.ui.status(_('remote: '), l)
352 else:
352 else:
353 # bundle2 push. Send a stream, fetch a stream.
353 # bundle2 push. Send a stream, fetch a stream.
354 stream = self._calltwowaystream('unbundle', cg, heads=heads)
354 stream = self._calltwowaystream('unbundle', cg, heads=heads)
355 ret = bundle2.getunbundler(self.ui, stream)
355 ret = bundle2.getunbundler(self.ui, stream)
356 return ret
356 return ret
357
357
358 # End of basewirepeer interface.
358 # End of basewirepeer interface.
359
359
360 # Begin of baselegacywirepeer interface.
360 # Begin of baselegacywirepeer interface.
361
361
362 def branches(self, nodes):
362 def branches(self, nodes):
363 n = encodelist(nodes)
363 n = encodelist(nodes)
364 d = self._call("branches", nodes=n)
364 d = self._call("branches", nodes=n)
365 try:
365 try:
366 br = [tuple(decodelist(b)) for b in d.splitlines()]
366 br = [tuple(decodelist(b)) for b in d.splitlines()]
367 return br
367 return br
368 except ValueError:
368 except ValueError:
369 self._abort(error.ResponseError(_("unexpected response:"), d))
369 self._abort(error.ResponseError(_("unexpected response:"), d))
370
370
371 def between(self, pairs):
371 def between(self, pairs):
372 batch = 8 # avoid giant requests
372 batch = 8 # avoid giant requests
373 r = []
373 r = []
374 for i in xrange(0, len(pairs), batch):
374 for i in xrange(0, len(pairs), batch):
375 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
375 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
376 d = self._call("between", pairs=n)
376 d = self._call("between", pairs=n)
377 try:
377 try:
378 r.extend(l and decodelist(l) or [] for l in d.splitlines())
378 r.extend(l and decodelist(l) or [] for l in d.splitlines())
379 except ValueError:
379 except ValueError:
380 self._abort(error.ResponseError(_("unexpected response:"), d))
380 self._abort(error.ResponseError(_("unexpected response:"), d))
381 return r
381 return r
382
382
383 def changegroup(self, nodes, kind):
383 def changegroup(self, nodes, kind):
384 n = encodelist(nodes)
384 n = encodelist(nodes)
385 f = self._callcompressable("changegroup", roots=n)
385 f = self._callcompressable("changegroup", roots=n)
386 return changegroupmod.cg1unpacker(f, 'UN')
386 return changegroupmod.cg1unpacker(f, 'UN')
387
387
388 def changegroupsubset(self, bases, heads, kind):
388 def changegroupsubset(self, bases, heads, kind):
389 self.requirecap('changegroupsubset', _('look up remote changes'))
389 self.requirecap('changegroupsubset', _('look up remote changes'))
390 bases = encodelist(bases)
390 bases = encodelist(bases)
391 heads = encodelist(heads)
391 heads = encodelist(heads)
392 f = self._callcompressable("changegroupsubset",
392 f = self._callcompressable("changegroupsubset",
393 bases=bases, heads=heads)
393 bases=bases, heads=heads)
394 return changegroupmod.cg1unpacker(f, 'UN')
394 return changegroupmod.cg1unpacker(f, 'UN')
395
395
396 # End of baselegacywirepeer interface.
396 # End of baselegacywirepeer interface.
397
397
398 def _submitbatch(self, req):
398 def _submitbatch(self, req):
399 """run batch request <req> on the server
399 """run batch request <req> on the server
400
400
401 Returns an iterator of the raw responses from the server.
401 Returns an iterator of the raw responses from the server.
402 """
402 """
403 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
403 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
404 chunk = rsp.read(1024)
404 chunk = rsp.read(1024)
405 work = [chunk]
405 work = [chunk]
406 while chunk:
406 while chunk:
407 while ';' not in chunk and chunk:
407 while ';' not in chunk and chunk:
408 chunk = rsp.read(1024)
408 chunk = rsp.read(1024)
409 work.append(chunk)
409 work.append(chunk)
410 merged = ''.join(work)
410 merged = ''.join(work)
411 while ';' in merged:
411 while ';' in merged:
412 one, merged = merged.split(';', 1)
412 one, merged = merged.split(';', 1)
413 yield unescapearg(one)
413 yield unescapearg(one)
414 chunk = rsp.read(1024)
414 chunk = rsp.read(1024)
415 work = [merged, chunk]
415 work = [merged, chunk]
416 yield unescapearg(''.join(work))
416 yield unescapearg(''.join(work))
417
417
418 def _submitone(self, op, args):
418 def _submitone(self, op, args):
419 return self._call(op, **pycompat.strkwargs(args))
419 return self._call(op, **pycompat.strkwargs(args))
420
420
421 def debugwireargs(self, one, two, three=None, four=None, five=None):
421 def debugwireargs(self, one, two, three=None, four=None, five=None):
422 # don't pass optional arguments left at their default value
422 # don't pass optional arguments left at their default value
423 opts = {}
423 opts = {}
424 if three is not None:
424 if three is not None:
425 opts[r'three'] = three
425 opts[r'three'] = three
426 if four is not None:
426 if four is not None:
427 opts[r'four'] = four
427 opts[r'four'] = four
428 return self._call('debugwireargs', one=one, two=two, **opts)
428 return self._call('debugwireargs', one=one, two=two, **opts)
429
429
430 def _call(self, cmd, **args):
430 def _call(self, cmd, **args):
431 """execute <cmd> on the server
431 """execute <cmd> on the server
432
432
433 The command is expected to return a simple string.
433 The command is expected to return a simple string.
434
434
435 returns the server reply as a string."""
435 returns the server reply as a string."""
436 raise NotImplementedError()
436 raise NotImplementedError()
437
437
438 def _callstream(self, cmd, **args):
438 def _callstream(self, cmd, **args):
439 """execute <cmd> on the server
439 """execute <cmd> on the server
440
440
441 The command is expected to return a stream. Note that if the
441 The command is expected to return a stream. Note that if the
442 command doesn't return a stream, _callstream behaves
442 command doesn't return a stream, _callstream behaves
443 differently for ssh and http peers.
443 differently for ssh and http peers.
444
444
445 returns the server reply as a file like object.
445 returns the server reply as a file like object.
446 """
446 """
447 raise NotImplementedError()
447 raise NotImplementedError()
448
448
449 def _callcompressable(self, cmd, **args):
449 def _callcompressable(self, cmd, **args):
450 """execute <cmd> on the server
450 """execute <cmd> on the server
451
451
452 The command is expected to return a stream.
452 The command is expected to return a stream.
453
453
454 The stream may have been compressed in some implementations. This
454 The stream may have been compressed in some implementations. This
455 function takes care of the decompression. This is the only difference
455 function takes care of the decompression. This is the only difference
456 with _callstream.
456 with _callstream.
457
457
458 returns the server reply as a file like object.
458 returns the server reply as a file like object.
459 """
459 """
460 raise NotImplementedError()
460 raise NotImplementedError()
461
461
462 def _callpush(self, cmd, fp, **args):
462 def _callpush(self, cmd, fp, **args):
463 """execute a <cmd> on server
463 """execute a <cmd> on server
464
464
465 The command is expected to be related to a push. Push has a special
465 The command is expected to be related to a push. Push has a special
466 return method.
466 return method.
467
467
468 returns the server reply as a (ret, output) tuple. ret is either
468 returns the server reply as a (ret, output) tuple. ret is either
469 empty (error) or a stringified int.
469 empty (error) or a stringified int.
470 """
470 """
471 raise NotImplementedError()
471 raise NotImplementedError()
472
472
473 def _calltwowaystream(self, cmd, fp, **args):
473 def _calltwowaystream(self, cmd, fp, **args):
474 """execute <cmd> on server
474 """execute <cmd> on server
475
475
476 The command will send a stream to the server and get a stream in reply.
476 The command will send a stream to the server and get a stream in reply.
477 """
477 """
478 raise NotImplementedError()
478 raise NotImplementedError()
479
479
480 def _abort(self, exception):
480 def _abort(self, exception):
481 """clearly abort the wire protocol connection and raise the exception
481 """clearly abort the wire protocol connection and raise the exception
482 """
482 """
483 raise NotImplementedError()
483 raise NotImplementedError()
484
484
485 # server side
485 # server side
486
486
487 # wire protocol command can either return a string or one of these classes.
487 # wire protocol command can either return a string or one of these classes.
488
488
489 def getdispatchrepo(repo, proto, command):
489 def getdispatchrepo(repo, proto, command):
490 """Obtain the repo used for processing wire protocol commands.
490 """Obtain the repo used for processing wire protocol commands.
491
491
492 The intent of this function is to serve as a monkeypatch point for
492 The intent of this function is to serve as a monkeypatch point for
493 extensions that need commands to operate on different repo views under
493 extensions that need commands to operate on different repo views under
494 specialized circumstances.
494 specialized circumstances.
495 """
495 """
496 return repo.filtered('served')
496 return repo.filtered('served')
497
497
498 def dispatch(repo, proto, command):
498 def dispatch(repo, proto, command):
499 repo = getdispatchrepo(repo, proto, command)
499 repo = getdispatchrepo(repo, proto, command)
500 func, spec = commands[command]
500 func, spec = commands[command]
501 args = proto.getargs(spec)
501 args = proto.getargs(spec)
502 return func(repo, proto, *args)
502 return func(repo, proto, *args)
503
503
504 def options(cmd, keys, others):
504 def options(cmd, keys, others):
505 opts = {}
505 opts = {}
506 for k in keys:
506 for k in keys:
507 if k in others:
507 if k in others:
508 opts[k] = others[k]
508 opts[k] = others[k]
509 del others[k]
509 del others[k]
510 if others:
510 if others:
511 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
511 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
512 % (cmd, ",".join(others)))
512 % (cmd, ",".join(others)))
513 return opts
513 return opts
514
514
515 def bundle1allowed(repo, action):
515 def bundle1allowed(repo, action):
516 """Whether a bundle1 operation is allowed from the server.
516 """Whether a bundle1 operation is allowed from the server.
517
517
518 Priority is:
518 Priority is:
519
519
520 1. server.bundle1gd.<action> (if generaldelta active)
520 1. server.bundle1gd.<action> (if generaldelta active)
521 2. server.bundle1.<action>
521 2. server.bundle1.<action>
522 3. server.bundle1gd (if generaldelta active)
522 3. server.bundle1gd (if generaldelta active)
523 4. server.bundle1
523 4. server.bundle1
524 """
524 """
525 ui = repo.ui
525 ui = repo.ui
526 gd = 'generaldelta' in repo.requirements
526 gd = 'generaldelta' in repo.requirements
527
527
528 if gd:
528 if gd:
529 v = ui.configbool('server', 'bundle1gd.%s' % action)
529 v = ui.configbool('server', 'bundle1gd.%s' % action)
530 if v is not None:
530 if v is not None:
531 return v
531 return v
532
532
533 v = ui.configbool('server', 'bundle1.%s' % action)
533 v = ui.configbool('server', 'bundle1.%s' % action)
534 if v is not None:
534 if v is not None:
535 return v
535 return v
536
536
537 if gd:
537 if gd:
538 v = ui.configbool('server', 'bundle1gd')
538 v = ui.configbool('server', 'bundle1gd')
539 if v is not None:
539 if v is not None:
540 return v
540 return v
541
541
542 return ui.configbool('server', 'bundle1')
542 return ui.configbool('server', 'bundle1')
543
543
544 def supportedcompengines(ui, role):
544 def supportedcompengines(ui, role):
545 """Obtain the list of supported compression engines for a request."""
545 """Obtain the list of supported compression engines for a request."""
546 assert role in (util.CLIENTROLE, util.SERVERROLE)
546 assert role in (util.CLIENTROLE, util.SERVERROLE)
547
547
548 compengines = util.compengines.supportedwireengines(role)
548 compengines = util.compengines.supportedwireengines(role)
549
549
550 # Allow config to override default list and ordering.
550 # Allow config to override default list and ordering.
551 if role == util.SERVERROLE:
551 if role == util.SERVERROLE:
552 configengines = ui.configlist('server', 'compressionengines')
552 configengines = ui.configlist('server', 'compressionengines')
553 config = 'server.compressionengines'
553 config = 'server.compressionengines'
554 else:
554 else:
555 # This is currently implemented mainly to facilitate testing. In most
555 # This is currently implemented mainly to facilitate testing. In most
556 # cases, the server should be in charge of choosing a compression engine
556 # cases, the server should be in charge of choosing a compression engine
557 # because a server has the most to lose from a sub-optimal choice. (e.g.
557 # because a server has the most to lose from a sub-optimal choice. (e.g.
558 # CPU DoS due to an expensive engine or a network DoS due to poor
558 # CPU DoS due to an expensive engine or a network DoS due to poor
559 # compression ratio).
559 # compression ratio).
560 configengines = ui.configlist('experimental',
560 configengines = ui.configlist('experimental',
561 'clientcompressionengines')
561 'clientcompressionengines')
562 config = 'experimental.clientcompressionengines'
562 config = 'experimental.clientcompressionengines'
563
563
564 # No explicit config. Filter out the ones that aren't supposed to be
564 # No explicit config. Filter out the ones that aren't supposed to be
565 # advertised and return default ordering.
565 # advertised and return default ordering.
566 if not configengines:
566 if not configengines:
567 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
567 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
568 return [e for e in compengines
568 return [e for e in compengines
569 if getattr(e.wireprotosupport(), attr) > 0]
569 if getattr(e.wireprotosupport(), attr) > 0]
570
570
571 # If compression engines are listed in the config, assume there is a good
571 # If compression engines are listed in the config, assume there is a good
572 # reason for it (like server operators wanting to achieve specific
572 # reason for it (like server operators wanting to achieve specific
573 # performance characteristics). So fail fast if the config references
573 # performance characteristics). So fail fast if the config references
574 # unusable compression engines.
574 # unusable compression engines.
575 validnames = set(e.name() for e in compengines)
575 validnames = set(e.name() for e in compengines)
576 invalidnames = set(e for e in configengines if e not in validnames)
576 invalidnames = set(e for e in configengines if e not in validnames)
577 if invalidnames:
577 if invalidnames:
578 raise error.Abort(_('invalid compression engine defined in %s: %s') %
578 raise error.Abort(_('invalid compression engine defined in %s: %s') %
579 (config, ', '.join(sorted(invalidnames))))
579 (config, ', '.join(sorted(invalidnames))))
580
580
581 compengines = [e for e in compengines if e.name() in configengines]
581 compengines = [e for e in compengines if e.name() in configengines]
582 compengines = sorted(compengines,
582 compengines = sorted(compengines,
583 key=lambda e: configengines.index(e.name()))
583 key=lambda e: configengines.index(e.name()))
584
584
585 if not compengines:
585 if not compengines:
586 raise error.Abort(_('%s config option does not specify any known '
586 raise error.Abort(_('%s config option does not specify any known '
587 'compression engines') % config,
587 'compression engines') % config,
588 hint=_('usable compression engines: %s') %
588 hint=_('usable compression engines: %s') %
589 ', '.sorted(validnames))
589 ', '.sorted(validnames))
590
590
591 return compengines
591 return compengines
592
592
593 class commandentry(object):
593 class commandentry(object):
594 """Represents a declared wire protocol command."""
594 """Represents a declared wire protocol command."""
595 def __init__(self, func, args=''):
595 def __init__(self, func, args=''):
596 self.func = func
596 self.func = func
597 self.args = args
597 self.args = args
598
598
599 def _merge(self, func, args):
599 def _merge(self, func, args):
600 """Merge this instance with an incoming 2-tuple.
600 """Merge this instance with an incoming 2-tuple.
601
601
602 This is called when a caller using the old 2-tuple API attempts
602 This is called when a caller using the old 2-tuple API attempts
603 to replace an instance. The incoming values are merged with
603 to replace an instance. The incoming values are merged with
604 data not captured by the 2-tuple and a new instance containing
604 data not captured by the 2-tuple and a new instance containing
605 the union of the two objects is returned.
605 the union of the two objects is returned.
606 """
606 """
607 return commandentry(func, args)
607 return commandentry(func, args)
608
608
609 # Old code treats instances as 2-tuples. So expose that interface.
609 # Old code treats instances as 2-tuples. So expose that interface.
610 def __iter__(self):
610 def __iter__(self):
611 yield self.func
611 yield self.func
612 yield self.args
612 yield self.args
613
613
614 def __getitem__(self, i):
614 def __getitem__(self, i):
615 if i == 0:
615 if i == 0:
616 return self.func
616 return self.func
617 elif i == 1:
617 elif i == 1:
618 return self.args
618 return self.args
619 else:
619 else:
620 raise IndexError('can only access elements 0 and 1')
620 raise IndexError('can only access elements 0 and 1')
621
621
622 class commanddict(dict):
622 class commanddict(dict):
623 """Container for registered wire protocol commands.
623 """Container for registered wire protocol commands.
624
624
625 It behaves like a dict. But __setitem__ is overwritten to allow silent
625 It behaves like a dict. But __setitem__ is overwritten to allow silent
626 coercion of values from 2-tuples for API compatibility.
626 coercion of values from 2-tuples for API compatibility.
627 """
627 """
628 def __setitem__(self, k, v):
628 def __setitem__(self, k, v):
629 if isinstance(v, commandentry):
629 if isinstance(v, commandentry):
630 pass
630 pass
631 # Cast 2-tuples to commandentry instances.
631 # Cast 2-tuples to commandentry instances.
632 elif isinstance(v, tuple):
632 elif isinstance(v, tuple):
633 if len(v) != 2:
633 if len(v) != 2:
634 raise ValueError('command tuples must have exactly 2 elements')
634 raise ValueError('command tuples must have exactly 2 elements')
635
635
636 # It is common for extensions to wrap wire protocol commands via
636 # It is common for extensions to wrap wire protocol commands via
637 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
637 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
638 # doing this aren't aware of the new API that uses objects to store
638 # doing this aren't aware of the new API that uses objects to store
639 # command entries, we automatically merge old state with new.
639 # command entries, we automatically merge old state with new.
640 if k in self:
640 if k in self:
641 v = self[k]._merge(v[0], v[1])
641 v = self[k]._merge(v[0], v[1])
642 else:
642 else:
643 v = commandentry(v[0], v[1])
643 v = commandentry(v[0], v[1])
644 else:
644 else:
645 raise ValueError('command entries must be commandentry instances '
645 raise ValueError('command entries must be commandentry instances '
646 'or 2-tuples')
646 'or 2-tuples')
647
647
648 return super(commanddict, self).__setitem__(k, v)
648 return super(commanddict, self).__setitem__(k, v)
649
649
650 def commandavailable(self, command, proto):
650 def commandavailable(self, command, proto):
651 """Determine if a command is available for the requested protocol."""
651 """Determine if a command is available for the requested protocol."""
652 # For now, commands are available for all protocols. So do a simple
652 # For now, commands are available for all protocols. So do a simple
653 # membership test.
653 # membership test.
654 return command in self
654 return command in self
655
655
656 commands = commanddict()
656 commands = commanddict()
657
657
658 def wireprotocommand(name, args=''):
658 def wireprotocommand(name, args=''):
659 """Decorator to declare a wire protocol command.
659 """Decorator to declare a wire protocol command.
660
660
661 ``name`` is the name of the wire protocol command being provided.
661 ``name`` is the name of the wire protocol command being provided.
662
662
663 ``args`` is a space-delimited list of named arguments that the command
663 ``args`` is a space-delimited list of named arguments that the command
664 accepts. ``*`` is a special value that says to accept all arguments.
664 accepts. ``*`` is a special value that says to accept all arguments.
665 """
665 """
666 def register(func):
666 def register(func):
667 commands[name] = commandentry(func, args)
667 commands[name] = commandentry(func, args)
668 return func
668 return func
669 return register
669 return register
670
670
671 @wireprotocommand('batch', 'cmds *')
671 @wireprotocommand('batch', 'cmds *')
672 def batch(repo, proto, cmds, others):
672 def batch(repo, proto, cmds, others):
673 repo = repo.filtered("served")
673 repo = repo.filtered("served")
674 res = []
674 res = []
675 for pair in cmds.split(';'):
675 for pair in cmds.split(';'):
676 op, args = pair.split(' ', 1)
676 op, args = pair.split(' ', 1)
677 vals = {}
677 vals = {}
678 for a in args.split(','):
678 for a in args.split(','):
679 if a:
679 if a:
680 n, v = a.split('=')
680 n, v = a.split('=')
681 vals[unescapearg(n)] = unescapearg(v)
681 vals[unescapearg(n)] = unescapearg(v)
682 func, spec = commands[op]
682 func, spec = commands[op]
683 if spec:
683 if spec:
684 keys = spec.split()
684 keys = spec.split()
685 data = {}
685 data = {}
686 for k in keys:
686 for k in keys:
687 if k == '*':
687 if k == '*':
688 star = {}
688 star = {}
689 for key in vals.keys():
689 for key in vals.keys():
690 if key not in keys:
690 if key not in keys:
691 star[key] = vals[key]
691 star[key] = vals[key]
692 data['*'] = star
692 data['*'] = star
693 else:
693 else:
694 data[k] = vals[k]
694 data[k] = vals[k]
695 result = func(repo, proto, *[data[k] for k in keys])
695 result = func(repo, proto, *[data[k] for k in keys])
696 else:
696 else:
697 result = func(repo, proto)
697 result = func(repo, proto)
698 if isinstance(result, ooberror):
698 if isinstance(result, ooberror):
699 return result
699 return result
700
700
701 # For now, all batchable commands must return bytesresponse or
701 # For now, all batchable commands must return bytesresponse or
702 # raw bytes (for backwards compatibility).
702 # raw bytes (for backwards compatibility).
703 assert isinstance(result, (bytesresponse, bytes))
703 assert isinstance(result, (bytesresponse, bytes))
704 if isinstance(result, bytesresponse):
704 if isinstance(result, bytesresponse):
705 result = result.data
705 result = result.data
706 res.append(escapearg(result))
706 res.append(escapearg(result))
707
707
708 return bytesresponse(';'.join(res))
708 return bytesresponse(';'.join(res))
709
709
710 @wireprotocommand('between', 'pairs')
710 @wireprotocommand('between', 'pairs')
711 def between(repo, proto, pairs):
711 def between(repo, proto, pairs):
712 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
712 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
713 r = []
713 r = []
714 for b in repo.between(pairs):
714 for b in repo.between(pairs):
715 r.append(encodelist(b) + "\n")
715 r.append(encodelist(b) + "\n")
716
716
717 return bytesresponse(''.join(r))
717 return bytesresponse(''.join(r))
718
718
719 @wireprotocommand('branchmap')
719 @wireprotocommand('branchmap')
720 def branchmap(repo, proto):
720 def branchmap(repo, proto):
721 branchmap = repo.branchmap()
721 branchmap = repo.branchmap()
722 heads = []
722 heads = []
723 for branch, nodes in branchmap.iteritems():
723 for branch, nodes in branchmap.iteritems():
724 branchname = urlreq.quote(encoding.fromlocal(branch))
724 branchname = urlreq.quote(encoding.fromlocal(branch))
725 branchnodes = encodelist(nodes)
725 branchnodes = encodelist(nodes)
726 heads.append('%s %s' % (branchname, branchnodes))
726 heads.append('%s %s' % (branchname, branchnodes))
727
727
728 return bytesresponse('\n'.join(heads))
728 return bytesresponse('\n'.join(heads))
729
729
730 @wireprotocommand('branches', 'nodes')
730 @wireprotocommand('branches', 'nodes')
731 def branches(repo, proto, nodes):
731 def branches(repo, proto, nodes):
732 nodes = decodelist(nodes)
732 nodes = decodelist(nodes)
733 r = []
733 r = []
734 for b in repo.branches(nodes):
734 for b in repo.branches(nodes):
735 r.append(encodelist(b) + "\n")
735 r.append(encodelist(b) + "\n")
736
736
737 return bytesresponse(''.join(r))
737 return bytesresponse(''.join(r))
738
738
739 @wireprotocommand('clonebundles', '')
739 @wireprotocommand('clonebundles', '')
740 def clonebundles(repo, proto):
740 def clonebundles(repo, proto):
741 """Server command for returning info for available bundles to seed clones.
741 """Server command for returning info for available bundles to seed clones.
742
742
743 Clients will parse this response and determine what bundle to fetch.
743 Clients will parse this response and determine what bundle to fetch.
744
744
745 Extensions may wrap this command to filter or dynamically emit data
745 Extensions may wrap this command to filter or dynamically emit data
746 depending on the request. e.g. you could advertise URLs for the closest
746 depending on the request. e.g. you could advertise URLs for the closest
747 data center given the client's IP address.
747 data center given the client's IP address.
748 """
748 """
749 return bytesresponse(repo.vfs.tryread('clonebundles.manifest'))
749 return bytesresponse(repo.vfs.tryread('clonebundles.manifest'))
750
750
751 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
751 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
752 'known', 'getbundle', 'unbundlehash', 'batch']
752 'known', 'getbundle', 'unbundlehash', 'batch']
753
753
754 def _capabilities(repo, proto):
754 def _capabilities(repo, proto):
755 """return a list of capabilities for a repo
755 """return a list of capabilities for a repo
756
756
757 This function exists to allow extensions to easily wrap capabilities
757 This function exists to allow extensions to easily wrap capabilities
758 computation
758 computation
759
759
760 - returns a lists: easy to alter
760 - returns a lists: easy to alter
761 - change done here will be propagated to both `capabilities` and `hello`
761 - change done here will be propagated to both `capabilities` and `hello`
762 command without any other action needed.
762 command without any other action needed.
763 """
763 """
764 # copy to prevent modification of the global list
764 # copy to prevent modification of the global list
765 caps = list(wireprotocaps)
765 caps = list(wireprotocaps)
766 if streamclone.allowservergeneration(repo):
766 if streamclone.allowservergeneration(repo):
767 if repo.ui.configbool('server', 'preferuncompressed'):
767 if repo.ui.configbool('server', 'preferuncompressed'):
768 caps.append('stream-preferred')
768 caps.append('stream-preferred')
769 requiredformats = repo.requirements & repo.supportedformats
769 requiredformats = repo.requirements & repo.supportedformats
770 # if our local revlogs are just revlogv1, add 'stream' cap
770 # if our local revlogs are just revlogv1, add 'stream' cap
771 if not requiredformats - {'revlogv1'}:
771 if not requiredformats - {'revlogv1'}:
772 caps.append('stream')
772 caps.append('stream')
773 # otherwise, add 'streamreqs' detailing our local revlog format
773 # otherwise, add 'streamreqs' detailing our local revlog format
774 else:
774 else:
775 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
775 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
776 if repo.ui.configbool('experimental', 'bundle2-advertise'):
776 if repo.ui.configbool('experimental', 'bundle2-advertise'):
777 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
777 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
778 caps.append('bundle2=' + urlreq.quote(capsblob))
778 caps.append('bundle2=' + urlreq.quote(capsblob))
779 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
779 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
780
780
781 if proto.name == 'http-v1':
781 if proto.name == 'http-v1':
782 caps.append('httpheader=%d' %
782 caps.append('httpheader=%d' %
783 repo.ui.configint('server', 'maxhttpheaderlen'))
783 repo.ui.configint('server', 'maxhttpheaderlen'))
784 if repo.ui.configbool('experimental', 'httppostargs'):
784 if repo.ui.configbool('experimental', 'httppostargs'):
785 caps.append('httppostargs')
785 caps.append('httppostargs')
786
786
787 # FUTURE advertise 0.2rx once support is implemented
787 # FUTURE advertise 0.2rx once support is implemented
788 # FUTURE advertise minrx and mintx after consulting config option
788 # FUTURE advertise minrx and mintx after consulting config option
789 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
789 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
790
790
791 compengines = supportedcompengines(repo.ui, util.SERVERROLE)
791 compengines = supportedcompengines(repo.ui, util.SERVERROLE)
792 if compengines:
792 if compengines:
793 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
793 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
794 for e in compengines)
794 for e in compengines)
795 caps.append('compression=%s' % comptypes)
795 caps.append('compression=%s' % comptypes)
796
796
797 return caps
797 return caps
798
798
799 # If you are writing an extension and consider wrapping this function. Wrap
799 # If you are writing an extension and consider wrapping this function. Wrap
800 # `_capabilities` instead.
800 # `_capabilities` instead.
801 @wireprotocommand('capabilities')
801 @wireprotocommand('capabilities')
802 def capabilities(repo, proto):
802 def capabilities(repo, proto):
803 return bytesresponse(' '.join(_capabilities(repo, proto)))
803 return bytesresponse(' '.join(_capabilities(repo, proto)))
804
804
805 @wireprotocommand('changegroup', 'roots')
805 @wireprotocommand('changegroup', 'roots')
806 def changegroup(repo, proto, roots):
806 def changegroup(repo, proto, roots):
807 nodes = decodelist(roots)
807 nodes = decodelist(roots)
808 outgoing = discovery.outgoing(repo, missingroots=nodes,
808 outgoing = discovery.outgoing(repo, missingroots=nodes,
809 missingheads=repo.heads())
809 missingheads=repo.heads())
810 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
810 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
811 gen = iter(lambda: cg.read(32768), '')
811 gen = iter(lambda: cg.read(32768), '')
812 return streamres(gen=gen)
812 return streamres(gen=gen)
813
813
814 @wireprotocommand('changegroupsubset', 'bases heads')
814 @wireprotocommand('changegroupsubset', 'bases heads')
815 def changegroupsubset(repo, proto, bases, heads):
815 def changegroupsubset(repo, proto, bases, heads):
816 bases = decodelist(bases)
816 bases = decodelist(bases)
817 heads = decodelist(heads)
817 heads = decodelist(heads)
818 outgoing = discovery.outgoing(repo, missingroots=bases,
818 outgoing = discovery.outgoing(repo, missingroots=bases,
819 missingheads=heads)
819 missingheads=heads)
820 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
820 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
821 gen = iter(lambda: cg.read(32768), '')
821 gen = iter(lambda: cg.read(32768), '')
822 return streamres(gen=gen)
822 return streamres(gen=gen)
823
823
824 @wireprotocommand('debugwireargs', 'one two *')
824 @wireprotocommand('debugwireargs', 'one two *')
825 def debugwireargs(repo, proto, one, two, others):
825 def debugwireargs(repo, proto, one, two, others):
826 # only accept optional args from the known set
826 # only accept optional args from the known set
827 opts = options('debugwireargs', ['three', 'four'], others)
827 opts = options('debugwireargs', ['three', 'four'], others)
828 return bytesresponse(repo.debugwireargs(one, two,
828 return bytesresponse(repo.debugwireargs(one, two,
829 **pycompat.strkwargs(opts)))
829 **pycompat.strkwargs(opts)))
830
830
831 @wireprotocommand('getbundle', '*')
831 @wireprotocommand('getbundle', '*')
832 def getbundle(repo, proto, others):
832 def getbundle(repo, proto, others):
833 opts = options('getbundle', gboptsmap.keys(), others)
833 opts = options('getbundle', gboptsmap.keys(), others)
834 for k, v in opts.iteritems():
834 for k, v in opts.iteritems():
835 keytype = gboptsmap[k]
835 keytype = gboptsmap[k]
836 if keytype == 'nodes':
836 if keytype == 'nodes':
837 opts[k] = decodelist(v)
837 opts[k] = decodelist(v)
838 elif keytype == 'csv':
838 elif keytype == 'csv':
839 opts[k] = list(v.split(','))
839 opts[k] = list(v.split(','))
840 elif keytype == 'scsv':
840 elif keytype == 'scsv':
841 opts[k] = set(v.split(','))
841 opts[k] = set(v.split(','))
842 elif keytype == 'boolean':
842 elif keytype == 'boolean':
843 # Client should serialize False as '0', which is a non-empty string
843 # Client should serialize False as '0', which is a non-empty string
844 # so it evaluates as a True bool.
844 # so it evaluates as a True bool.
845 if v == '0':
845 if v == '0':
846 opts[k] = False
846 opts[k] = False
847 else:
847 else:
848 opts[k] = bool(v)
848 opts[k] = bool(v)
849 elif keytype != 'plain':
849 elif keytype != 'plain':
850 raise KeyError('unknown getbundle option type %s'
850 raise KeyError('unknown getbundle option type %s'
851 % keytype)
851 % keytype)
852
852
853 if not bundle1allowed(repo, 'pull'):
853 if not bundle1allowed(repo, 'pull'):
854 if not exchange.bundle2requested(opts.get('bundlecaps')):
854 if not exchange.bundle2requested(opts.get('bundlecaps')):
855 if proto.name == 'http-v1':
855 if proto.name == 'http-v1':
856 return ooberror(bundle2required)
856 return ooberror(bundle2required)
857 raise error.Abort(bundle2requiredmain,
857 raise error.Abort(bundle2requiredmain,
858 hint=bundle2requiredhint)
858 hint=bundle2requiredhint)
859
859
860 prefercompressed = True
860 prefercompressed = True
861
861
862 try:
862 try:
863 if repo.ui.configbool('server', 'disablefullbundle'):
863 if repo.ui.configbool('server', 'disablefullbundle'):
864 # Check to see if this is a full clone.
864 # Check to see if this is a full clone.
865 clheads = set(repo.changelog.heads())
865 clheads = set(repo.changelog.heads())
866 changegroup = opts.get('cg', True)
866 changegroup = opts.get('cg', True)
867 heads = set(opts.get('heads', set()))
867 heads = set(opts.get('heads', set()))
868 common = set(opts.get('common', set()))
868 common = set(opts.get('common', set()))
869 common.discard(nullid)
869 common.discard(nullid)
870 if changegroup and not common and clheads == heads:
870 if changegroup and not common and clheads == heads:
871 raise error.Abort(
871 raise error.Abort(
872 _('server has pull-based clones disabled'),
872 _('server has pull-based clones disabled'),
873 hint=_('remove --pull if specified or upgrade Mercurial'))
873 hint=_('remove --pull if specified or upgrade Mercurial'))
874
874
875 info, chunks = exchange.getbundlechunks(repo, 'serve',
875 info, chunks = exchange.getbundlechunks(repo, 'serve',
876 **pycompat.strkwargs(opts))
876 **pycompat.strkwargs(opts))
877 prefercompressed = info.get('prefercompressed', True)
877 prefercompressed = info.get('prefercompressed', True)
878 except error.Abort as exc:
878 except error.Abort as exc:
879 # cleanly forward Abort error to the client
879 # cleanly forward Abort error to the client
880 if not exchange.bundle2requested(opts.get('bundlecaps')):
880 if not exchange.bundle2requested(opts.get('bundlecaps')):
881 if proto.name == 'http-v1':
881 if proto.name == 'http-v1':
882 return ooberror(pycompat.bytestr(exc) + '\n')
882 return ooberror(pycompat.bytestr(exc) + '\n')
883 raise # cannot do better for bundle1 + ssh
883 raise # cannot do better for bundle1 + ssh
884 # bundle2 request expect a bundle2 reply
884 # bundle2 request expect a bundle2 reply
885 bundler = bundle2.bundle20(repo.ui)
885 bundler = bundle2.bundle20(repo.ui)
886 manargs = [('message', pycompat.bytestr(exc))]
886 manargs = [('message', pycompat.bytestr(exc))]
887 advargs = []
887 advargs = []
888 if exc.hint is not None:
888 if exc.hint is not None:
889 advargs.append(('hint', exc.hint))
889 advargs.append(('hint', exc.hint))
890 bundler.addpart(bundle2.bundlepart('error:abort',
890 bundler.addpart(bundle2.bundlepart('error:abort',
891 manargs, advargs))
891 manargs, advargs))
892 chunks = bundler.getchunks()
892 chunks = bundler.getchunks()
893 prefercompressed = False
893 prefercompressed = False
894
894
895 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
895 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
896
896
897 @wireprotocommand('heads')
897 @wireprotocommand('heads')
898 def heads(repo, proto):
898 def heads(repo, proto):
899 h = repo.heads()
899 h = repo.heads()
900 return bytesresponse(encodelist(h) + '\n')
900 return bytesresponse(encodelist(h) + '\n')
901
901
902 @wireprotocommand('hello')
902 @wireprotocommand('hello')
903 def hello(repo, proto):
903 def hello(repo, proto):
904 """Called as part of SSH handshake to obtain server info.
904 """Called as part of SSH handshake to obtain server info.
905
905
906 Returns a list of lines describing interesting things about the
906 Returns a list of lines describing interesting things about the
907 server, in an RFC822-like format.
907 server, in an RFC822-like format.
908
908
909 Currently, the only one defined is ``capabilities``, which consists of a
909 Currently, the only one defined is ``capabilities``, which consists of a
910 line of space separated tokens describing server abilities:
910 line of space separated tokens describing server abilities:
911
911
912 capabilities: <token0> <token1> <token2>
912 capabilities: <token0> <token1> <token2>
913 """
913 """
914 caps = capabilities(repo, proto).data
914 caps = capabilities(repo, proto).data
915 return bytesresponse('capabilities: %s\n' % caps)
915 return bytesresponse('capabilities: %s\n' % caps)
916
916
917 @wireprotocommand('listkeys', 'namespace')
917 @wireprotocommand('listkeys', 'namespace')
918 def listkeys(repo, proto, namespace):
918 def listkeys(repo, proto, namespace):
919 d = repo.listkeys(encoding.tolocal(namespace)).items()
919 d = repo.listkeys(encoding.tolocal(namespace)).items()
920 return bytesresponse(pushkeymod.encodekeys(d))
920 return bytesresponse(pushkeymod.encodekeys(d))
921
921
922 @wireprotocommand('lookup', 'key')
922 @wireprotocommand('lookup', 'key')
923 def lookup(repo, proto, key):
923 def lookup(repo, proto, key):
924 try:
924 try:
925 k = encoding.tolocal(key)
925 k = encoding.tolocal(key)
926 c = repo[k]
926 c = repo[k]
927 r = c.hex()
927 r = c.hex()
928 success = 1
928 success = 1
929 except Exception as inst:
929 except Exception as inst:
930 r = str(inst)
930 r = util.forcebytestr(inst)
931 success = 0
931 success = 0
932 return bytesresponse('%d %s\n' % (success, r))
932 return bytesresponse('%d %s\n' % (success, r))
933
933
934 @wireprotocommand('known', 'nodes *')
934 @wireprotocommand('known', 'nodes *')
935 def known(repo, proto, nodes, others):
935 def known(repo, proto, nodes, others):
936 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
936 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
937 return bytesresponse(v)
937 return bytesresponse(v)
938
938
939 @wireprotocommand('pushkey', 'namespace key old new')
939 @wireprotocommand('pushkey', 'namespace key old new')
940 def pushkey(repo, proto, namespace, key, old, new):
940 def pushkey(repo, proto, namespace, key, old, new):
941 # compatibility with pre-1.8 clients which were accidentally
941 # compatibility with pre-1.8 clients which were accidentally
942 # sending raw binary nodes rather than utf-8-encoded hex
942 # sending raw binary nodes rather than utf-8-encoded hex
943 if len(new) == 20 and util.escapestr(new) != new:
943 if len(new) == 20 and util.escapestr(new) != new:
944 # looks like it could be a binary node
944 # looks like it could be a binary node
945 try:
945 try:
946 new.decode('utf-8')
946 new.decode('utf-8')
947 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
947 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
948 except UnicodeDecodeError:
948 except UnicodeDecodeError:
949 pass # binary, leave unmodified
949 pass # binary, leave unmodified
950 else:
950 else:
951 new = encoding.tolocal(new) # normal path
951 new = encoding.tolocal(new) # normal path
952
952
953 with proto.mayberedirectstdio() as output:
953 with proto.mayberedirectstdio() as output:
954 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
954 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
955 encoding.tolocal(old), new) or False
955 encoding.tolocal(old), new) or False
956
956
957 output = output.getvalue() if output else ''
957 output = output.getvalue() if output else ''
958 return bytesresponse('%s\n%s' % (int(r), output))
958 return bytesresponse('%s\n%s' % (int(r), output))
959
959
960 @wireprotocommand('stream_out')
960 @wireprotocommand('stream_out')
961 def stream(repo, proto):
961 def stream(repo, proto):
962 '''If the server supports streaming clone, it advertises the "stream"
962 '''If the server supports streaming clone, it advertises the "stream"
963 capability with a value representing the version and flags of the repo
963 capability with a value representing the version and flags of the repo
964 it is serving. Client checks to see if it understands the format.
964 it is serving. Client checks to see if it understands the format.
965 '''
965 '''
966 return streamres_legacy(streamclone.generatev1wireproto(repo))
966 return streamres_legacy(streamclone.generatev1wireproto(repo))
967
967
968 @wireprotocommand('unbundle', 'heads')
968 @wireprotocommand('unbundle', 'heads')
969 def unbundle(repo, proto, heads):
969 def unbundle(repo, proto, heads):
970 their_heads = decodelist(heads)
970 their_heads = decodelist(heads)
971
971
972 with proto.mayberedirectstdio() as output:
972 with proto.mayberedirectstdio() as output:
973 try:
973 try:
974 exchange.check_heads(repo, their_heads, 'preparing changes')
974 exchange.check_heads(repo, their_heads, 'preparing changes')
975
975
976 # write bundle data to temporary file because it can be big
976 # write bundle data to temporary file because it can be big
977 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
977 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
978 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
978 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
979 r = 0
979 r = 0
980 try:
980 try:
981 proto.forwardpayload(fp)
981 proto.forwardpayload(fp)
982 fp.seek(0)
982 fp.seek(0)
983 gen = exchange.readbundle(repo.ui, fp, None)
983 gen = exchange.readbundle(repo.ui, fp, None)
984 if (isinstance(gen, changegroupmod.cg1unpacker)
984 if (isinstance(gen, changegroupmod.cg1unpacker)
985 and not bundle1allowed(repo, 'push')):
985 and not bundle1allowed(repo, 'push')):
986 if proto.name == 'http-v1':
986 if proto.name == 'http-v1':
987 # need to special case http because stderr do not get to
987 # need to special case http because stderr do not get to
988 # the http client on failed push so we need to abuse
988 # the http client on failed push so we need to abuse
989 # some other error type to make sure the message get to
989 # some other error type to make sure the message get to
990 # the user.
990 # the user.
991 return ooberror(bundle2required)
991 return ooberror(bundle2required)
992 raise error.Abort(bundle2requiredmain,
992 raise error.Abort(bundle2requiredmain,
993 hint=bundle2requiredhint)
993 hint=bundle2requiredhint)
994
994
995 r = exchange.unbundle(repo, gen, their_heads, 'serve',
995 r = exchange.unbundle(repo, gen, their_heads, 'serve',
996 proto.client())
996 proto.client())
997 if util.safehasattr(r, 'addpart'):
997 if util.safehasattr(r, 'addpart'):
998 # The return looks streamable, we are in the bundle2 case
998 # The return looks streamable, we are in the bundle2 case
999 # and should return a stream.
999 # and should return a stream.
1000 return streamres_legacy(gen=r.getchunks())
1000 return streamres_legacy(gen=r.getchunks())
1001 return pushres(r, output.getvalue() if output else '')
1001 return pushres(r, output.getvalue() if output else '')
1002
1002
1003 finally:
1003 finally:
1004 fp.close()
1004 fp.close()
1005 os.unlink(tempname)
1005 os.unlink(tempname)
1006
1006
1007 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1007 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1008 # handle non-bundle2 case first
1008 # handle non-bundle2 case first
1009 if not getattr(exc, 'duringunbundle2', False):
1009 if not getattr(exc, 'duringunbundle2', False):
1010 try:
1010 try:
1011 raise
1011 raise
1012 except error.Abort:
1012 except error.Abort:
1013 # The old code we moved used util.stderr directly.
1013 # The old code we moved used util.stderr directly.
1014 # We did not change it to minimise code change.
1014 # We did not change it to minimise code change.
1015 # This need to be moved to something proper.
1015 # This need to be moved to something proper.
1016 # Feel free to do it.
1016 # Feel free to do it.
1017 util.stderr.write("abort: %s\n" % exc)
1017 util.stderr.write("abort: %s\n" % exc)
1018 if exc.hint is not None:
1018 if exc.hint is not None:
1019 util.stderr.write("(%s)\n" % exc.hint)
1019 util.stderr.write("(%s)\n" % exc.hint)
1020 return pushres(0, output.getvalue() if output else '')
1020 return pushres(0, output.getvalue() if output else '')
1021 except error.PushRaced:
1021 except error.PushRaced:
1022 return pusherr(str(exc),
1022 return pusherr(str(exc),
1023 output.getvalue() if output else '')
1023 output.getvalue() if output else '')
1024
1024
1025 bundler = bundle2.bundle20(repo.ui)
1025 bundler = bundle2.bundle20(repo.ui)
1026 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1026 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1027 bundler.addpart(out)
1027 bundler.addpart(out)
1028 try:
1028 try:
1029 try:
1029 try:
1030 raise
1030 raise
1031 except error.PushkeyFailed as exc:
1031 except error.PushkeyFailed as exc:
1032 # check client caps
1032 # check client caps
1033 remotecaps = getattr(exc, '_replycaps', None)
1033 remotecaps = getattr(exc, '_replycaps', None)
1034 if (remotecaps is not None
1034 if (remotecaps is not None
1035 and 'pushkey' not in remotecaps.get('error', ())):
1035 and 'pushkey' not in remotecaps.get('error', ())):
1036 # no support remote side, fallback to Abort handler.
1036 # no support remote side, fallback to Abort handler.
1037 raise
1037 raise
1038 part = bundler.newpart('error:pushkey')
1038 part = bundler.newpart('error:pushkey')
1039 part.addparam('in-reply-to', exc.partid)
1039 part.addparam('in-reply-to', exc.partid)
1040 if exc.namespace is not None:
1040 if exc.namespace is not None:
1041 part.addparam('namespace', exc.namespace,
1041 part.addparam('namespace', exc.namespace,
1042 mandatory=False)
1042 mandatory=False)
1043 if exc.key is not None:
1043 if exc.key is not None:
1044 part.addparam('key', exc.key, mandatory=False)
1044 part.addparam('key', exc.key, mandatory=False)
1045 if exc.new is not None:
1045 if exc.new is not None:
1046 part.addparam('new', exc.new, mandatory=False)
1046 part.addparam('new', exc.new, mandatory=False)
1047 if exc.old is not None:
1047 if exc.old is not None:
1048 part.addparam('old', exc.old, mandatory=False)
1048 part.addparam('old', exc.old, mandatory=False)
1049 if exc.ret is not None:
1049 if exc.ret is not None:
1050 part.addparam('ret', exc.ret, mandatory=False)
1050 part.addparam('ret', exc.ret, mandatory=False)
1051 except error.BundleValueError as exc:
1051 except error.BundleValueError as exc:
1052 errpart = bundler.newpart('error:unsupportedcontent')
1052 errpart = bundler.newpart('error:unsupportedcontent')
1053 if exc.parttype is not None:
1053 if exc.parttype is not None:
1054 errpart.addparam('parttype', exc.parttype)
1054 errpart.addparam('parttype', exc.parttype)
1055 if exc.params:
1055 if exc.params:
1056 errpart.addparam('params', '\0'.join(exc.params))
1056 errpart.addparam('params', '\0'.join(exc.params))
1057 except error.Abort as exc:
1057 except error.Abort as exc:
1058 manargs = [('message', str(exc))]
1058 manargs = [('message', util.forcebytestr(exc))]
1059 advargs = []
1059 advargs = []
1060 if exc.hint is not None:
1060 if exc.hint is not None:
1061 advargs.append(('hint', exc.hint))
1061 advargs.append(('hint', exc.hint))
1062 bundler.addpart(bundle2.bundlepart('error:abort',
1062 bundler.addpart(bundle2.bundlepart('error:abort',
1063 manargs, advargs))
1063 manargs, advargs))
1064 except error.PushRaced as exc:
1064 except error.PushRaced as exc:
1065 bundler.newpart('error:pushraced', [('message', str(exc))])
1065 bundler.newpart('error:pushraced',
1066 [('message', util.forcebytestr(exc))])
1066 return streamres_legacy(gen=bundler.getchunks())
1067 return streamres_legacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now