##// END OF EJS Templates
wireproto: define and use types for wire protocol commands...
Gregory Szorc -
r35999:ef683a0f default
parent child Browse files
Show More
@@ -1,1039 +1,1095 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 )
34 )
35
35
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 'IncompatibleClient')
41 'IncompatibleClient')
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43
43
44 class remoteiterbatcher(peer.iterbatcher):
44 class remoteiterbatcher(peer.iterbatcher):
45 def __init__(self, remote):
45 def __init__(self, remote):
46 super(remoteiterbatcher, self).__init__()
46 super(remoteiterbatcher, self).__init__()
47 self._remote = remote
47 self._remote = remote
48
48
49 def __getattr__(self, name):
49 def __getattr__(self, name):
50 # Validate this method is batchable, since submit() only supports
50 # Validate this method is batchable, since submit() only supports
51 # batchable methods.
51 # batchable methods.
52 fn = getattr(self._remote, name)
52 fn = getattr(self._remote, name)
53 if not getattr(fn, 'batchable', None):
53 if not getattr(fn, 'batchable', None):
54 raise error.ProgrammingError('Attempted to batch a non-batchable '
54 raise error.ProgrammingError('Attempted to batch a non-batchable '
55 'call to %r' % name)
55 'call to %r' % name)
56
56
57 return super(remoteiterbatcher, self).__getattr__(name)
57 return super(remoteiterbatcher, self).__getattr__(name)
58
58
59 def submit(self):
59 def submit(self):
60 """Break the batch request into many patch calls and pipeline them.
60 """Break the batch request into many patch calls and pipeline them.
61
61
62 This is mostly valuable over http where request sizes can be
62 This is mostly valuable over http where request sizes can be
63 limited, but can be used in other places as well.
63 limited, but can be used in other places as well.
64 """
64 """
65 # 2-tuple of (command, arguments) that represents what will be
65 # 2-tuple of (command, arguments) that represents what will be
66 # sent over the wire.
66 # sent over the wire.
67 requests = []
67 requests = []
68
68
69 # 4-tuple of (command, final future, @batchable generator, remote
69 # 4-tuple of (command, final future, @batchable generator, remote
70 # future).
70 # future).
71 results = []
71 results = []
72
72
73 for command, args, opts, finalfuture in self.calls:
73 for command, args, opts, finalfuture in self.calls:
74 mtd = getattr(self._remote, command)
74 mtd = getattr(self._remote, command)
75 batchable = mtd.batchable(mtd.__self__, *args, **opts)
75 batchable = mtd.batchable(mtd.__self__, *args, **opts)
76
76
77 commandargs, fremote = next(batchable)
77 commandargs, fremote = next(batchable)
78 assert fremote
78 assert fremote
79 requests.append((command, commandargs))
79 requests.append((command, commandargs))
80 results.append((command, finalfuture, batchable, fremote))
80 results.append((command, finalfuture, batchable, fremote))
81
81
82 if requests:
82 if requests:
83 self._resultiter = self._remote._submitbatch(requests)
83 self._resultiter = self._remote._submitbatch(requests)
84
84
85 self._results = results
85 self._results = results
86
86
87 def results(self):
87 def results(self):
88 for command, finalfuture, batchable, remotefuture in self._results:
88 for command, finalfuture, batchable, remotefuture in self._results:
89 # Get the raw result, set it in the remote future, feed it
89 # Get the raw result, set it in the remote future, feed it
90 # back into the @batchable generator so it can be decoded, and
90 # back into the @batchable generator so it can be decoded, and
91 # set the result on the final future to this value.
91 # set the result on the final future to this value.
92 remoteresult = next(self._resultiter)
92 remoteresult = next(self._resultiter)
93 remotefuture.set(remoteresult)
93 remotefuture.set(remoteresult)
94 finalfuture.set(next(batchable))
94 finalfuture.set(next(batchable))
95
95
96 # Verify our @batchable generators only emit 2 values.
96 # Verify our @batchable generators only emit 2 values.
97 try:
97 try:
98 next(batchable)
98 next(batchable)
99 except StopIteration:
99 except StopIteration:
100 pass
100 pass
101 else:
101 else:
102 raise error.ProgrammingError('%s @batchable generator emitted '
102 raise error.ProgrammingError('%s @batchable generator emitted '
103 'unexpected value count' % command)
103 'unexpected value count' % command)
104
104
105 yield finalfuture.value
105 yield finalfuture.value
106
106
107 # Forward a couple of names from peer to make wireproto interactions
107 # Forward a couple of names from peer to make wireproto interactions
108 # slightly more sensible.
108 # slightly more sensible.
109 batchable = peer.batchable
109 batchable = peer.batchable
110 future = peer.future
110 future = peer.future
111
111
112 # list of nodes encoding / decoding
112 # list of nodes encoding / decoding
113
113
114 def decodelist(l, sep=' '):
114 def decodelist(l, sep=' '):
115 if l:
115 if l:
116 return [bin(v) for v in l.split(sep)]
116 return [bin(v) for v in l.split(sep)]
117 return []
117 return []
118
118
119 def encodelist(l, sep=' '):
119 def encodelist(l, sep=' '):
120 try:
120 try:
121 return sep.join(map(hex, l))
121 return sep.join(map(hex, l))
122 except TypeError:
122 except TypeError:
123 raise
123 raise
124
124
125 # batched call argument encoding
125 # batched call argument encoding
126
126
127 def escapearg(plain):
127 def escapearg(plain):
128 return (plain
128 return (plain
129 .replace(':', ':c')
129 .replace(':', ':c')
130 .replace(',', ':o')
130 .replace(',', ':o')
131 .replace(';', ':s')
131 .replace(';', ':s')
132 .replace('=', ':e'))
132 .replace('=', ':e'))
133
133
134 def unescapearg(escaped):
134 def unescapearg(escaped):
135 return (escaped
135 return (escaped
136 .replace(':e', '=')
136 .replace(':e', '=')
137 .replace(':s', ';')
137 .replace(':s', ';')
138 .replace(':o', ',')
138 .replace(':o', ',')
139 .replace(':c', ':'))
139 .replace(':c', ':'))
140
140
141 def encodebatchcmds(req):
141 def encodebatchcmds(req):
142 """Return a ``cmds`` argument value for the ``batch`` command."""
142 """Return a ``cmds`` argument value for the ``batch`` command."""
143 cmds = []
143 cmds = []
144 for op, argsdict in req:
144 for op, argsdict in req:
145 # Old servers didn't properly unescape argument names. So prevent
145 # Old servers didn't properly unescape argument names. So prevent
146 # the sending of argument names that may not be decoded properly by
146 # the sending of argument names that may not be decoded properly by
147 # servers.
147 # servers.
148 assert all(escapearg(k) == k for k in argsdict)
148 assert all(escapearg(k) == k for k in argsdict)
149
149
150 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
150 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
151 for k, v in argsdict.iteritems())
151 for k, v in argsdict.iteritems())
152 cmds.append('%s %s' % (op, args))
152 cmds.append('%s %s' % (op, args))
153
153
154 return ';'.join(cmds)
154 return ';'.join(cmds)
155
155
156 # mapping of options accepted by getbundle and their types
156 # mapping of options accepted by getbundle and their types
157 #
157 #
158 # Meant to be extended by extensions. It is extensions responsibility to ensure
158 # Meant to be extended by extensions. It is extensions responsibility to ensure
159 # such options are properly processed in exchange.getbundle.
159 # such options are properly processed in exchange.getbundle.
160 #
160 #
161 # supported types are:
161 # supported types are:
162 #
162 #
163 # :nodes: list of binary nodes
163 # :nodes: list of binary nodes
164 # :csv: list of comma-separated values
164 # :csv: list of comma-separated values
165 # :scsv: list of comma-separated values return as set
165 # :scsv: list of comma-separated values return as set
166 # :plain: string with no transformation needed.
166 # :plain: string with no transformation needed.
167 gboptsmap = {'heads': 'nodes',
167 gboptsmap = {'heads': 'nodes',
168 'bookmarks': 'boolean',
168 'bookmarks': 'boolean',
169 'common': 'nodes',
169 'common': 'nodes',
170 'obsmarkers': 'boolean',
170 'obsmarkers': 'boolean',
171 'phases': 'boolean',
171 'phases': 'boolean',
172 'bundlecaps': 'scsv',
172 'bundlecaps': 'scsv',
173 'listkeys': 'csv',
173 'listkeys': 'csv',
174 'cg': 'boolean',
174 'cg': 'boolean',
175 'cbattempted': 'boolean',
175 'cbattempted': 'boolean',
176 'stream': 'boolean',
176 'stream': 'boolean',
177 }
177 }
178
178
179 # client side
179 # client side
180
180
181 class wirepeer(repository.legacypeer):
181 class wirepeer(repository.legacypeer):
182 """Client-side interface for communicating with a peer repository.
182 """Client-side interface for communicating with a peer repository.
183
183
184 Methods commonly call wire protocol commands of the same name.
184 Methods commonly call wire protocol commands of the same name.
185
185
186 See also httppeer.py and sshpeer.py for protocol-specific
186 See also httppeer.py and sshpeer.py for protocol-specific
187 implementations of this interface.
187 implementations of this interface.
188 """
188 """
189 # Begin of basewirepeer interface.
189 # Begin of basewirepeer interface.
190
190
191 def iterbatch(self):
191 def iterbatch(self):
192 return remoteiterbatcher(self)
192 return remoteiterbatcher(self)
193
193
194 @batchable
194 @batchable
195 def lookup(self, key):
195 def lookup(self, key):
196 self.requirecap('lookup', _('look up remote revision'))
196 self.requirecap('lookup', _('look up remote revision'))
197 f = future()
197 f = future()
198 yield {'key': encoding.fromlocal(key)}, f
198 yield {'key': encoding.fromlocal(key)}, f
199 d = f.value
199 d = f.value
200 success, data = d[:-1].split(" ", 1)
200 success, data = d[:-1].split(" ", 1)
201 if int(success):
201 if int(success):
202 yield bin(data)
202 yield bin(data)
203 else:
203 else:
204 self._abort(error.RepoError(data))
204 self._abort(error.RepoError(data))
205
205
206 @batchable
206 @batchable
207 def heads(self):
207 def heads(self):
208 f = future()
208 f = future()
209 yield {}, f
209 yield {}, f
210 d = f.value
210 d = f.value
211 try:
211 try:
212 yield decodelist(d[:-1])
212 yield decodelist(d[:-1])
213 except ValueError:
213 except ValueError:
214 self._abort(error.ResponseError(_("unexpected response:"), d))
214 self._abort(error.ResponseError(_("unexpected response:"), d))
215
215
216 @batchable
216 @batchable
217 def known(self, nodes):
217 def known(self, nodes):
218 f = future()
218 f = future()
219 yield {'nodes': encodelist(nodes)}, f
219 yield {'nodes': encodelist(nodes)}, f
220 d = f.value
220 d = f.value
221 try:
221 try:
222 yield [bool(int(b)) for b in d]
222 yield [bool(int(b)) for b in d]
223 except ValueError:
223 except ValueError:
224 self._abort(error.ResponseError(_("unexpected response:"), d))
224 self._abort(error.ResponseError(_("unexpected response:"), d))
225
225
226 @batchable
226 @batchable
227 def branchmap(self):
227 def branchmap(self):
228 f = future()
228 f = future()
229 yield {}, f
229 yield {}, f
230 d = f.value
230 d = f.value
231 try:
231 try:
232 branchmap = {}
232 branchmap = {}
233 for branchpart in d.splitlines():
233 for branchpart in d.splitlines():
234 branchname, branchheads = branchpart.split(' ', 1)
234 branchname, branchheads = branchpart.split(' ', 1)
235 branchname = encoding.tolocal(urlreq.unquote(branchname))
235 branchname = encoding.tolocal(urlreq.unquote(branchname))
236 branchheads = decodelist(branchheads)
236 branchheads = decodelist(branchheads)
237 branchmap[branchname] = branchheads
237 branchmap[branchname] = branchheads
238 yield branchmap
238 yield branchmap
239 except TypeError:
239 except TypeError:
240 self._abort(error.ResponseError(_("unexpected response:"), d))
240 self._abort(error.ResponseError(_("unexpected response:"), d))
241
241
242 @batchable
242 @batchable
243 def listkeys(self, namespace):
243 def listkeys(self, namespace):
244 if not self.capable('pushkey'):
244 if not self.capable('pushkey'):
245 yield {}, None
245 yield {}, None
246 f = future()
246 f = future()
247 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
247 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
248 yield {'namespace': encoding.fromlocal(namespace)}, f
248 yield {'namespace': encoding.fromlocal(namespace)}, f
249 d = f.value
249 d = f.value
250 self.ui.debug('received listkey for "%s": %i bytes\n'
250 self.ui.debug('received listkey for "%s": %i bytes\n'
251 % (namespace, len(d)))
251 % (namespace, len(d)))
252 yield pushkeymod.decodekeys(d)
252 yield pushkeymod.decodekeys(d)
253
253
254 @batchable
254 @batchable
255 def pushkey(self, namespace, key, old, new):
255 def pushkey(self, namespace, key, old, new):
256 if not self.capable('pushkey'):
256 if not self.capable('pushkey'):
257 yield False, None
257 yield False, None
258 f = future()
258 f = future()
259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
260 yield {'namespace': encoding.fromlocal(namespace),
260 yield {'namespace': encoding.fromlocal(namespace),
261 'key': encoding.fromlocal(key),
261 'key': encoding.fromlocal(key),
262 'old': encoding.fromlocal(old),
262 'old': encoding.fromlocal(old),
263 'new': encoding.fromlocal(new)}, f
263 'new': encoding.fromlocal(new)}, f
264 d = f.value
264 d = f.value
265 d, output = d.split('\n', 1)
265 d, output = d.split('\n', 1)
266 try:
266 try:
267 d = bool(int(d))
267 d = bool(int(d))
268 except ValueError:
268 except ValueError:
269 raise error.ResponseError(
269 raise error.ResponseError(
270 _('push failed (unexpected response):'), d)
270 _('push failed (unexpected response):'), d)
271 for l in output.splitlines(True):
271 for l in output.splitlines(True):
272 self.ui.status(_('remote: '), l)
272 self.ui.status(_('remote: '), l)
273 yield d
273 yield d
274
274
275 def stream_out(self):
275 def stream_out(self):
276 return self._callstream('stream_out')
276 return self._callstream('stream_out')
277
277
278 def getbundle(self, source, **kwargs):
278 def getbundle(self, source, **kwargs):
279 kwargs = pycompat.byteskwargs(kwargs)
279 kwargs = pycompat.byteskwargs(kwargs)
280 self.requirecap('getbundle', _('look up remote changes'))
280 self.requirecap('getbundle', _('look up remote changes'))
281 opts = {}
281 opts = {}
282 bundlecaps = kwargs.get('bundlecaps')
282 bundlecaps = kwargs.get('bundlecaps')
283 if bundlecaps is not None:
283 if bundlecaps is not None:
284 kwargs['bundlecaps'] = sorted(bundlecaps)
284 kwargs['bundlecaps'] = sorted(bundlecaps)
285 else:
285 else:
286 bundlecaps = () # kwargs could have it to None
286 bundlecaps = () # kwargs could have it to None
287 for key, value in kwargs.iteritems():
287 for key, value in kwargs.iteritems():
288 if value is None:
288 if value is None:
289 continue
289 continue
290 keytype = gboptsmap.get(key)
290 keytype = gboptsmap.get(key)
291 if keytype is None:
291 if keytype is None:
292 raise error.ProgrammingError(
292 raise error.ProgrammingError(
293 'Unexpectedly None keytype for key %s' % key)
293 'Unexpectedly None keytype for key %s' % key)
294 elif keytype == 'nodes':
294 elif keytype == 'nodes':
295 value = encodelist(value)
295 value = encodelist(value)
296 elif keytype in ('csv', 'scsv'):
296 elif keytype in ('csv', 'scsv'):
297 value = ','.join(value)
297 value = ','.join(value)
298 elif keytype == 'boolean':
298 elif keytype == 'boolean':
299 value = '%i' % bool(value)
299 value = '%i' % bool(value)
300 elif keytype != 'plain':
300 elif keytype != 'plain':
301 raise KeyError('unknown getbundle option type %s'
301 raise KeyError('unknown getbundle option type %s'
302 % keytype)
302 % keytype)
303 opts[key] = value
303 opts[key] = value
304 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
304 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
305 if any((cap.startswith('HG2') for cap in bundlecaps)):
305 if any((cap.startswith('HG2') for cap in bundlecaps)):
306 return bundle2.getunbundler(self.ui, f)
306 return bundle2.getunbundler(self.ui, f)
307 else:
307 else:
308 return changegroupmod.cg1unpacker(f, 'UN')
308 return changegroupmod.cg1unpacker(f, 'UN')
309
309
310 def unbundle(self, cg, heads, url):
310 def unbundle(self, cg, heads, url):
311 '''Send cg (a readable file-like object representing the
311 '''Send cg (a readable file-like object representing the
312 changegroup to push, typically a chunkbuffer object) to the
312 changegroup to push, typically a chunkbuffer object) to the
313 remote server as a bundle.
313 remote server as a bundle.
314
314
315 When pushing a bundle10 stream, return an integer indicating the
315 When pushing a bundle10 stream, return an integer indicating the
316 result of the push (see changegroup.apply()).
316 result of the push (see changegroup.apply()).
317
317
318 When pushing a bundle20 stream, return a bundle20 stream.
318 When pushing a bundle20 stream, return a bundle20 stream.
319
319
320 `url` is the url the client thinks it's pushing to, which is
320 `url` is the url the client thinks it's pushing to, which is
321 visible to hooks.
321 visible to hooks.
322 '''
322 '''
323
323
324 if heads != ['force'] and self.capable('unbundlehash'):
324 if heads != ['force'] and self.capable('unbundlehash'):
325 heads = encodelist(['hashed',
325 heads = encodelist(['hashed',
326 hashlib.sha1(''.join(sorted(heads))).digest()])
326 hashlib.sha1(''.join(sorted(heads))).digest()])
327 else:
327 else:
328 heads = encodelist(heads)
328 heads = encodelist(heads)
329
329
330 if util.safehasattr(cg, 'deltaheader'):
330 if util.safehasattr(cg, 'deltaheader'):
331 # this a bundle10, do the old style call sequence
331 # this a bundle10, do the old style call sequence
332 ret, output = self._callpush("unbundle", cg, heads=heads)
332 ret, output = self._callpush("unbundle", cg, heads=heads)
333 if ret == "":
333 if ret == "":
334 raise error.ResponseError(
334 raise error.ResponseError(
335 _('push failed:'), output)
335 _('push failed:'), output)
336 try:
336 try:
337 ret = int(ret)
337 ret = int(ret)
338 except ValueError:
338 except ValueError:
339 raise error.ResponseError(
339 raise error.ResponseError(
340 _('push failed (unexpected response):'), ret)
340 _('push failed (unexpected response):'), ret)
341
341
342 for l in output.splitlines(True):
342 for l in output.splitlines(True):
343 self.ui.status(_('remote: '), l)
343 self.ui.status(_('remote: '), l)
344 else:
344 else:
345 # bundle2 push. Send a stream, fetch a stream.
345 # bundle2 push. Send a stream, fetch a stream.
346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
347 ret = bundle2.getunbundler(self.ui, stream)
347 ret = bundle2.getunbundler(self.ui, stream)
348 return ret
348 return ret
349
349
350 # End of basewirepeer interface.
350 # End of basewirepeer interface.
351
351
352 # Begin of baselegacywirepeer interface.
352 # Begin of baselegacywirepeer interface.
353
353
354 def branches(self, nodes):
354 def branches(self, nodes):
355 n = encodelist(nodes)
355 n = encodelist(nodes)
356 d = self._call("branches", nodes=n)
356 d = self._call("branches", nodes=n)
357 try:
357 try:
358 br = [tuple(decodelist(b)) for b in d.splitlines()]
358 br = [tuple(decodelist(b)) for b in d.splitlines()]
359 return br
359 return br
360 except ValueError:
360 except ValueError:
361 self._abort(error.ResponseError(_("unexpected response:"), d))
361 self._abort(error.ResponseError(_("unexpected response:"), d))
362
362
363 def between(self, pairs):
363 def between(self, pairs):
364 batch = 8 # avoid giant requests
364 batch = 8 # avoid giant requests
365 r = []
365 r = []
366 for i in xrange(0, len(pairs), batch):
366 for i in xrange(0, len(pairs), batch):
367 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
367 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
368 d = self._call("between", pairs=n)
368 d = self._call("between", pairs=n)
369 try:
369 try:
370 r.extend(l and decodelist(l) or [] for l in d.splitlines())
370 r.extend(l and decodelist(l) or [] for l in d.splitlines())
371 except ValueError:
371 except ValueError:
372 self._abort(error.ResponseError(_("unexpected response:"), d))
372 self._abort(error.ResponseError(_("unexpected response:"), d))
373 return r
373 return r
374
374
375 def changegroup(self, nodes, kind):
375 def changegroup(self, nodes, kind):
376 n = encodelist(nodes)
376 n = encodelist(nodes)
377 f = self._callcompressable("changegroup", roots=n)
377 f = self._callcompressable("changegroup", roots=n)
378 return changegroupmod.cg1unpacker(f, 'UN')
378 return changegroupmod.cg1unpacker(f, 'UN')
379
379
380 def changegroupsubset(self, bases, heads, kind):
380 def changegroupsubset(self, bases, heads, kind):
381 self.requirecap('changegroupsubset', _('look up remote changes'))
381 self.requirecap('changegroupsubset', _('look up remote changes'))
382 bases = encodelist(bases)
382 bases = encodelist(bases)
383 heads = encodelist(heads)
383 heads = encodelist(heads)
384 f = self._callcompressable("changegroupsubset",
384 f = self._callcompressable("changegroupsubset",
385 bases=bases, heads=heads)
385 bases=bases, heads=heads)
386 return changegroupmod.cg1unpacker(f, 'UN')
386 return changegroupmod.cg1unpacker(f, 'UN')
387
387
388 # End of baselegacywirepeer interface.
388 # End of baselegacywirepeer interface.
389
389
390 def _submitbatch(self, req):
390 def _submitbatch(self, req):
391 """run batch request <req> on the server
391 """run batch request <req> on the server
392
392
393 Returns an iterator of the raw responses from the server.
393 Returns an iterator of the raw responses from the server.
394 """
394 """
395 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
395 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
396 chunk = rsp.read(1024)
396 chunk = rsp.read(1024)
397 work = [chunk]
397 work = [chunk]
398 while chunk:
398 while chunk:
399 while ';' not in chunk and chunk:
399 while ';' not in chunk and chunk:
400 chunk = rsp.read(1024)
400 chunk = rsp.read(1024)
401 work.append(chunk)
401 work.append(chunk)
402 merged = ''.join(work)
402 merged = ''.join(work)
403 while ';' in merged:
403 while ';' in merged:
404 one, merged = merged.split(';', 1)
404 one, merged = merged.split(';', 1)
405 yield unescapearg(one)
405 yield unescapearg(one)
406 chunk = rsp.read(1024)
406 chunk = rsp.read(1024)
407 work = [merged, chunk]
407 work = [merged, chunk]
408 yield unescapearg(''.join(work))
408 yield unescapearg(''.join(work))
409
409
410 def _submitone(self, op, args):
410 def _submitone(self, op, args):
411 return self._call(op, **pycompat.strkwargs(args))
411 return self._call(op, **pycompat.strkwargs(args))
412
412
413 def debugwireargs(self, one, two, three=None, four=None, five=None):
413 def debugwireargs(self, one, two, three=None, four=None, five=None):
414 # don't pass optional arguments left at their default value
414 # don't pass optional arguments left at their default value
415 opts = {}
415 opts = {}
416 if three is not None:
416 if three is not None:
417 opts[r'three'] = three
417 opts[r'three'] = three
418 if four is not None:
418 if four is not None:
419 opts[r'four'] = four
419 opts[r'four'] = four
420 return self._call('debugwireargs', one=one, two=two, **opts)
420 return self._call('debugwireargs', one=one, two=two, **opts)
421
421
422 def _call(self, cmd, **args):
422 def _call(self, cmd, **args):
423 """execute <cmd> on the server
423 """execute <cmd> on the server
424
424
425 The command is expected to return a simple string.
425 The command is expected to return a simple string.
426
426
427 returns the server reply as a string."""
427 returns the server reply as a string."""
428 raise NotImplementedError()
428 raise NotImplementedError()
429
429
430 def _callstream(self, cmd, **args):
430 def _callstream(self, cmd, **args):
431 """execute <cmd> on the server
431 """execute <cmd> on the server
432
432
433 The command is expected to return a stream. Note that if the
433 The command is expected to return a stream. Note that if the
434 command doesn't return a stream, _callstream behaves
434 command doesn't return a stream, _callstream behaves
435 differently for ssh and http peers.
435 differently for ssh and http peers.
436
436
437 returns the server reply as a file like object.
437 returns the server reply as a file like object.
438 """
438 """
439 raise NotImplementedError()
439 raise NotImplementedError()
440
440
441 def _callcompressable(self, cmd, **args):
441 def _callcompressable(self, cmd, **args):
442 """execute <cmd> on the server
442 """execute <cmd> on the server
443
443
444 The command is expected to return a stream.
444 The command is expected to return a stream.
445
445
446 The stream may have been compressed in some implementations. This
446 The stream may have been compressed in some implementations. This
447 function takes care of the decompression. This is the only difference
447 function takes care of the decompression. This is the only difference
448 with _callstream.
448 with _callstream.
449
449
450 returns the server reply as a file like object.
450 returns the server reply as a file like object.
451 """
451 """
452 raise NotImplementedError()
452 raise NotImplementedError()
453
453
454 def _callpush(self, cmd, fp, **args):
454 def _callpush(self, cmd, fp, **args):
455 """execute a <cmd> on server
455 """execute a <cmd> on server
456
456
457 The command is expected to be related to a push. Push has a special
457 The command is expected to be related to a push. Push has a special
458 return method.
458 return method.
459
459
460 returns the server reply as a (ret, output) tuple. ret is either
460 returns the server reply as a (ret, output) tuple. ret is either
461 empty (error) or a stringified int.
461 empty (error) or a stringified int.
462 """
462 """
463 raise NotImplementedError()
463 raise NotImplementedError()
464
464
465 def _calltwowaystream(self, cmd, fp, **args):
465 def _calltwowaystream(self, cmd, fp, **args):
466 """execute <cmd> on server
466 """execute <cmd> on server
467
467
468 The command will send a stream to the server and get a stream in reply.
468 The command will send a stream to the server and get a stream in reply.
469 """
469 """
470 raise NotImplementedError()
470 raise NotImplementedError()
471
471
472 def _abort(self, exception):
472 def _abort(self, exception):
473 """clearly abort the wire protocol connection and raise the exception
473 """clearly abort the wire protocol connection and raise the exception
474 """
474 """
475 raise NotImplementedError()
475 raise NotImplementedError()
476
476
477 # server side
477 # server side
478
478
479 # wire protocol command can either return a string or one of these classes.
479 # wire protocol command can either return a string or one of these classes.
480 class streamres(object):
480 class streamres(object):
481 """wireproto reply: binary stream
481 """wireproto reply: binary stream
482
482
483 The call was successful and the result is a stream.
483 The call was successful and the result is a stream.
484
484
485 Accepts a generator containing chunks of data to be sent to the client.
485 Accepts a generator containing chunks of data to be sent to the client.
486
486
487 ``prefer_uncompressed`` indicates that the data is expected to be
487 ``prefer_uncompressed`` indicates that the data is expected to be
488 uncompressable and that the stream should therefore use the ``none``
488 uncompressable and that the stream should therefore use the ``none``
489 engine.
489 engine.
490 """
490 """
491 def __init__(self, gen=None, prefer_uncompressed=False):
491 def __init__(self, gen=None, prefer_uncompressed=False):
492 self.gen = gen
492 self.gen = gen
493 self.prefer_uncompressed = prefer_uncompressed
493 self.prefer_uncompressed = prefer_uncompressed
494
494
495 class streamres_legacy(object):
495 class streamres_legacy(object):
496 """wireproto reply: uncompressed binary stream
496 """wireproto reply: uncompressed binary stream
497
497
498 The call was successful and the result is a stream.
498 The call was successful and the result is a stream.
499
499
500 Accepts a generator containing chunks of data to be sent to the client.
500 Accepts a generator containing chunks of data to be sent to the client.
501
501
502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
503 using the application/mercurial-0.1 media type.
503 using the application/mercurial-0.1 media type.
504 """
504 """
505 def __init__(self, gen=None):
505 def __init__(self, gen=None):
506 self.gen = gen
506 self.gen = gen
507
507
508 class pushres(object):
508 class pushres(object):
509 """wireproto reply: success with simple integer return
509 """wireproto reply: success with simple integer return
510
510
511 The call was successful and returned an integer contained in `self.res`.
511 The call was successful and returned an integer contained in `self.res`.
512 """
512 """
513 def __init__(self, res):
513 def __init__(self, res):
514 self.res = res
514 self.res = res
515
515
516 class pusherr(object):
516 class pusherr(object):
517 """wireproto reply: failure
517 """wireproto reply: failure
518
518
519 The call failed. The `self.res` attribute contains the error message.
519 The call failed. The `self.res` attribute contains the error message.
520 """
520 """
521 def __init__(self, res):
521 def __init__(self, res):
522 self.res = res
522 self.res = res
523
523
524 class ooberror(object):
524 class ooberror(object):
525 """wireproto reply: failure of a batch of operation
525 """wireproto reply: failure of a batch of operation
526
526
527 Something failed during a batch call. The error message is stored in
527 Something failed during a batch call. The error message is stored in
528 `self.message`.
528 `self.message`.
529 """
529 """
530 def __init__(self, message):
530 def __init__(self, message):
531 self.message = message
531 self.message = message
532
532
533 def getdispatchrepo(repo, proto, command):
533 def getdispatchrepo(repo, proto, command):
534 """Obtain the repo used for processing wire protocol commands.
534 """Obtain the repo used for processing wire protocol commands.
535
535
536 The intent of this function is to serve as a monkeypatch point for
536 The intent of this function is to serve as a monkeypatch point for
537 extensions that need commands to operate on different repo views under
537 extensions that need commands to operate on different repo views under
538 specialized circumstances.
538 specialized circumstances.
539 """
539 """
540 return repo.filtered('served')
540 return repo.filtered('served')
541
541
542 def dispatch(repo, proto, command):
542 def dispatch(repo, proto, command):
543 repo = getdispatchrepo(repo, proto, command)
543 repo = getdispatchrepo(repo, proto, command)
544 func, spec = commands[command]
544 func, spec = commands[command]
545 args = proto.getargs(spec)
545 args = proto.getargs(spec)
546 return func(repo, proto, *args)
546 return func(repo, proto, *args)
547
547
548 def options(cmd, keys, others):
548 def options(cmd, keys, others):
549 opts = {}
549 opts = {}
550 for k in keys:
550 for k in keys:
551 if k in others:
551 if k in others:
552 opts[k] = others[k]
552 opts[k] = others[k]
553 del others[k]
553 del others[k]
554 if others:
554 if others:
555 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
555 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
556 % (cmd, ",".join(others)))
556 % (cmd, ",".join(others)))
557 return opts
557 return opts
558
558
559 def bundle1allowed(repo, action):
559 def bundle1allowed(repo, action):
560 """Whether a bundle1 operation is allowed from the server.
560 """Whether a bundle1 operation is allowed from the server.
561
561
562 Priority is:
562 Priority is:
563
563
564 1. server.bundle1gd.<action> (if generaldelta active)
564 1. server.bundle1gd.<action> (if generaldelta active)
565 2. server.bundle1.<action>
565 2. server.bundle1.<action>
566 3. server.bundle1gd (if generaldelta active)
566 3. server.bundle1gd (if generaldelta active)
567 4. server.bundle1
567 4. server.bundle1
568 """
568 """
569 ui = repo.ui
569 ui = repo.ui
570 gd = 'generaldelta' in repo.requirements
570 gd = 'generaldelta' in repo.requirements
571
571
572 if gd:
572 if gd:
573 v = ui.configbool('server', 'bundle1gd.%s' % action)
573 v = ui.configbool('server', 'bundle1gd.%s' % action)
574 if v is not None:
574 if v is not None:
575 return v
575 return v
576
576
577 v = ui.configbool('server', 'bundle1.%s' % action)
577 v = ui.configbool('server', 'bundle1.%s' % action)
578 if v is not None:
578 if v is not None:
579 return v
579 return v
580
580
581 if gd:
581 if gd:
582 v = ui.configbool('server', 'bundle1gd')
582 v = ui.configbool('server', 'bundle1gd')
583 if v is not None:
583 if v is not None:
584 return v
584 return v
585
585
586 return ui.configbool('server', 'bundle1')
586 return ui.configbool('server', 'bundle1')
587
587
588 def supportedcompengines(ui, proto, role):
588 def supportedcompengines(ui, proto, role):
589 """Obtain the list of supported compression engines for a request."""
589 """Obtain the list of supported compression engines for a request."""
590 assert role in (util.CLIENTROLE, util.SERVERROLE)
590 assert role in (util.CLIENTROLE, util.SERVERROLE)
591
591
592 compengines = util.compengines.supportedwireengines(role)
592 compengines = util.compengines.supportedwireengines(role)
593
593
594 # Allow config to override default list and ordering.
594 # Allow config to override default list and ordering.
595 if role == util.SERVERROLE:
595 if role == util.SERVERROLE:
596 configengines = ui.configlist('server', 'compressionengines')
596 configengines = ui.configlist('server', 'compressionengines')
597 config = 'server.compressionengines'
597 config = 'server.compressionengines'
598 else:
598 else:
599 # This is currently implemented mainly to facilitate testing. In most
599 # This is currently implemented mainly to facilitate testing. In most
600 # cases, the server should be in charge of choosing a compression engine
600 # cases, the server should be in charge of choosing a compression engine
601 # because a server has the most to lose from a sub-optimal choice. (e.g.
601 # because a server has the most to lose from a sub-optimal choice. (e.g.
602 # CPU DoS due to an expensive engine or a network DoS due to poor
602 # CPU DoS due to an expensive engine or a network DoS due to poor
603 # compression ratio).
603 # compression ratio).
604 configengines = ui.configlist('experimental',
604 configengines = ui.configlist('experimental',
605 'clientcompressionengines')
605 'clientcompressionengines')
606 config = 'experimental.clientcompressionengines'
606 config = 'experimental.clientcompressionengines'
607
607
608 # No explicit config. Filter out the ones that aren't supposed to be
608 # No explicit config. Filter out the ones that aren't supposed to be
609 # advertised and return default ordering.
609 # advertised and return default ordering.
610 if not configengines:
610 if not configengines:
611 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
611 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
612 return [e for e in compengines
612 return [e for e in compengines
613 if getattr(e.wireprotosupport(), attr) > 0]
613 if getattr(e.wireprotosupport(), attr) > 0]
614
614
615 # If compression engines are listed in the config, assume there is a good
615 # If compression engines are listed in the config, assume there is a good
616 # reason for it (like server operators wanting to achieve specific
616 # reason for it (like server operators wanting to achieve specific
617 # performance characteristics). So fail fast if the config references
617 # performance characteristics). So fail fast if the config references
618 # unusable compression engines.
618 # unusable compression engines.
619 validnames = set(e.name() for e in compengines)
619 validnames = set(e.name() for e in compengines)
620 invalidnames = set(e for e in configengines if e not in validnames)
620 invalidnames = set(e for e in configengines if e not in validnames)
621 if invalidnames:
621 if invalidnames:
622 raise error.Abort(_('invalid compression engine defined in %s: %s') %
622 raise error.Abort(_('invalid compression engine defined in %s: %s') %
623 (config, ', '.join(sorted(invalidnames))))
623 (config, ', '.join(sorted(invalidnames))))
624
624
625 compengines = [e for e in compengines if e.name() in configengines]
625 compengines = [e for e in compengines if e.name() in configengines]
626 compengines = sorted(compengines,
626 compengines = sorted(compengines,
627 key=lambda e: configengines.index(e.name()))
627 key=lambda e: configengines.index(e.name()))
628
628
629 if not compengines:
629 if not compengines:
630 raise error.Abort(_('%s config option does not specify any known '
630 raise error.Abort(_('%s config option does not specify any known '
631 'compression engines') % config,
631 'compression engines') % config,
632 hint=_('usable compression engines: %s') %
632 hint=_('usable compression engines: %s') %
633 ', '.sorted(validnames))
633 ', '.sorted(validnames))
634
634
635 return compengines
635 return compengines
636
636
637 # list of commands
637 class commandentry(object):
638 commands = {}
638 """Represents a declared wire protocol command."""
639 def __init__(self, func, args=''):
640 self.func = func
641 self.args = args
642
643 def _merge(self, func, args):
644 """Merge this instance with an incoming 2-tuple.
645
646 This is called when a caller using the old 2-tuple API attempts
647 to replace an instance. The incoming values are merged with
648 data not captured by the 2-tuple and a new instance containing
649 the union of the two objects is returned.
650 """
651 return commandentry(func, args)
652
653 # Old code treats instances as 2-tuples. So expose that interface.
654 def __iter__(self):
655 yield self.func
656 yield self.args
657
658 def __getitem__(self, i):
659 if i == 0:
660 return self.func
661 elif i == 1:
662 return self.args
663 else:
664 raise IndexError('can only access elements 0 and 1')
665
666 class commanddict(dict):
667 """Container for registered wire protocol commands.
668
669 It behaves like a dict. But __setitem__ is overwritten to allow silent
670 coercion of values from 2-tuples for API compatibility.
671 """
672 def __setitem__(self, k, v):
673 if isinstance(v, commandentry):
674 pass
675 # Cast 2-tuples to commandentry instances.
676 elif isinstance(v, tuple):
677 if len(v) != 2:
678 raise ValueError('command tuples must have exactly 2 elements')
679
680 # It is common for extensions to wrap wire protocol commands via
681 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
682 # doing this aren't aware of the new API that uses objects to store
683 # command entries, we automatically merge old state with new.
684 if k in self:
685 v = self[k]._merge(v[0], v[1])
686 else:
687 v = commandentry(v[0], v[1])
688 else:
689 raise ValueError('command entries must be commandentry instances '
690 'or 2-tuples')
691
692 return super(commanddict, self).__setitem__(k, v)
693
694 commands = commanddict()
639
695
640 def wireprotocommand(name, args=''):
696 def wireprotocommand(name, args=''):
641 """Decorator to declare a wire protocol command.
697 """Decorator to declare a wire protocol command.
642
698
643 ``name`` is the name of the wire protocol command being provided.
699 ``name`` is the name of the wire protocol command being provided.
644
700
645 ``args`` is a space-delimited list of named arguments that the command
701 ``args`` is a space-delimited list of named arguments that the command
646 accepts. ``*`` is a special value that says to accept all arguments.
702 accepts. ``*`` is a special value that says to accept all arguments.
647 """
703 """
648 def register(func):
704 def register(func):
649 commands[name] = (func, args)
705 commands[name] = commandentry(func, args)
650 return func
706 return func
651 return register
707 return register
652
708
653 @wireprotocommand('batch', 'cmds *')
709 @wireprotocommand('batch', 'cmds *')
654 def batch(repo, proto, cmds, others):
710 def batch(repo, proto, cmds, others):
655 repo = repo.filtered("served")
711 repo = repo.filtered("served")
656 res = []
712 res = []
657 for pair in cmds.split(';'):
713 for pair in cmds.split(';'):
658 op, args = pair.split(' ', 1)
714 op, args = pair.split(' ', 1)
659 vals = {}
715 vals = {}
660 for a in args.split(','):
716 for a in args.split(','):
661 if a:
717 if a:
662 n, v = a.split('=')
718 n, v = a.split('=')
663 vals[unescapearg(n)] = unescapearg(v)
719 vals[unescapearg(n)] = unescapearg(v)
664 func, spec = commands[op]
720 func, spec = commands[op]
665 if spec:
721 if spec:
666 keys = spec.split()
722 keys = spec.split()
667 data = {}
723 data = {}
668 for k in keys:
724 for k in keys:
669 if k == '*':
725 if k == '*':
670 star = {}
726 star = {}
671 for key in vals.keys():
727 for key in vals.keys():
672 if key not in keys:
728 if key not in keys:
673 star[key] = vals[key]
729 star[key] = vals[key]
674 data['*'] = star
730 data['*'] = star
675 else:
731 else:
676 data[k] = vals[k]
732 data[k] = vals[k]
677 result = func(repo, proto, *[data[k] for k in keys])
733 result = func(repo, proto, *[data[k] for k in keys])
678 else:
734 else:
679 result = func(repo, proto)
735 result = func(repo, proto)
680 if isinstance(result, ooberror):
736 if isinstance(result, ooberror):
681 return result
737 return result
682 res.append(escapearg(result))
738 res.append(escapearg(result))
683 return ';'.join(res)
739 return ';'.join(res)
684
740
685 @wireprotocommand('between', 'pairs')
741 @wireprotocommand('between', 'pairs')
686 def between(repo, proto, pairs):
742 def between(repo, proto, pairs):
687 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
743 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
688 r = []
744 r = []
689 for b in repo.between(pairs):
745 for b in repo.between(pairs):
690 r.append(encodelist(b) + "\n")
746 r.append(encodelist(b) + "\n")
691 return "".join(r)
747 return "".join(r)
692
748
693 @wireprotocommand('branchmap')
749 @wireprotocommand('branchmap')
694 def branchmap(repo, proto):
750 def branchmap(repo, proto):
695 branchmap = repo.branchmap()
751 branchmap = repo.branchmap()
696 heads = []
752 heads = []
697 for branch, nodes in branchmap.iteritems():
753 for branch, nodes in branchmap.iteritems():
698 branchname = urlreq.quote(encoding.fromlocal(branch))
754 branchname = urlreq.quote(encoding.fromlocal(branch))
699 branchnodes = encodelist(nodes)
755 branchnodes = encodelist(nodes)
700 heads.append('%s %s' % (branchname, branchnodes))
756 heads.append('%s %s' % (branchname, branchnodes))
701 return '\n'.join(heads)
757 return '\n'.join(heads)
702
758
703 @wireprotocommand('branches', 'nodes')
759 @wireprotocommand('branches', 'nodes')
704 def branches(repo, proto, nodes):
760 def branches(repo, proto, nodes):
705 nodes = decodelist(nodes)
761 nodes = decodelist(nodes)
706 r = []
762 r = []
707 for b in repo.branches(nodes):
763 for b in repo.branches(nodes):
708 r.append(encodelist(b) + "\n")
764 r.append(encodelist(b) + "\n")
709 return "".join(r)
765 return "".join(r)
710
766
711 @wireprotocommand('clonebundles', '')
767 @wireprotocommand('clonebundles', '')
712 def clonebundles(repo, proto):
768 def clonebundles(repo, proto):
713 """Server command for returning info for available bundles to seed clones.
769 """Server command for returning info for available bundles to seed clones.
714
770
715 Clients will parse this response and determine what bundle to fetch.
771 Clients will parse this response and determine what bundle to fetch.
716
772
717 Extensions may wrap this command to filter or dynamically emit data
773 Extensions may wrap this command to filter or dynamically emit data
718 depending on the request. e.g. you could advertise URLs for the closest
774 depending on the request. e.g. you could advertise URLs for the closest
719 data center given the client's IP address.
775 data center given the client's IP address.
720 """
776 """
721 return repo.vfs.tryread('clonebundles.manifest')
777 return repo.vfs.tryread('clonebundles.manifest')
722
778
723 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
779 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
724 'known', 'getbundle', 'unbundlehash', 'batch']
780 'known', 'getbundle', 'unbundlehash', 'batch']
725
781
726 def _capabilities(repo, proto):
782 def _capabilities(repo, proto):
727 """return a list of capabilities for a repo
783 """return a list of capabilities for a repo
728
784
729 This function exists to allow extensions to easily wrap capabilities
785 This function exists to allow extensions to easily wrap capabilities
730 computation
786 computation
731
787
732 - returns a lists: easy to alter
788 - returns a lists: easy to alter
733 - change done here will be propagated to both `capabilities` and `hello`
789 - change done here will be propagated to both `capabilities` and `hello`
734 command without any other action needed.
790 command without any other action needed.
735 """
791 """
736 # copy to prevent modification of the global list
792 # copy to prevent modification of the global list
737 caps = list(wireprotocaps)
793 caps = list(wireprotocaps)
738 if streamclone.allowservergeneration(repo):
794 if streamclone.allowservergeneration(repo):
739 if repo.ui.configbool('server', 'preferuncompressed'):
795 if repo.ui.configbool('server', 'preferuncompressed'):
740 caps.append('stream-preferred')
796 caps.append('stream-preferred')
741 requiredformats = repo.requirements & repo.supportedformats
797 requiredformats = repo.requirements & repo.supportedformats
742 # if our local revlogs are just revlogv1, add 'stream' cap
798 # if our local revlogs are just revlogv1, add 'stream' cap
743 if not requiredformats - {'revlogv1'}:
799 if not requiredformats - {'revlogv1'}:
744 caps.append('stream')
800 caps.append('stream')
745 # otherwise, add 'streamreqs' detailing our local revlog format
801 # otherwise, add 'streamreqs' detailing our local revlog format
746 else:
802 else:
747 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
803 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
748 if repo.ui.configbool('experimental', 'bundle2-advertise'):
804 if repo.ui.configbool('experimental', 'bundle2-advertise'):
749 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
805 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
750 caps.append('bundle2=' + urlreq.quote(capsblob))
806 caps.append('bundle2=' + urlreq.quote(capsblob))
751 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
807 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
752
808
753 if proto.name == 'http':
809 if proto.name == 'http':
754 caps.append('httpheader=%d' %
810 caps.append('httpheader=%d' %
755 repo.ui.configint('server', 'maxhttpheaderlen'))
811 repo.ui.configint('server', 'maxhttpheaderlen'))
756 if repo.ui.configbool('experimental', 'httppostargs'):
812 if repo.ui.configbool('experimental', 'httppostargs'):
757 caps.append('httppostargs')
813 caps.append('httppostargs')
758
814
759 # FUTURE advertise 0.2rx once support is implemented
815 # FUTURE advertise 0.2rx once support is implemented
760 # FUTURE advertise minrx and mintx after consulting config option
816 # FUTURE advertise minrx and mintx after consulting config option
761 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
817 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
762
818
763 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
819 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
764 if compengines:
820 if compengines:
765 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
821 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
766 for e in compengines)
822 for e in compengines)
767 caps.append('compression=%s' % comptypes)
823 caps.append('compression=%s' % comptypes)
768
824
769 return caps
825 return caps
770
826
771 # If you are writing an extension and consider wrapping this function. Wrap
827 # If you are writing an extension and consider wrapping this function. Wrap
772 # `_capabilities` instead.
828 # `_capabilities` instead.
773 @wireprotocommand('capabilities')
829 @wireprotocommand('capabilities')
774 def capabilities(repo, proto):
830 def capabilities(repo, proto):
775 return ' '.join(_capabilities(repo, proto))
831 return ' '.join(_capabilities(repo, proto))
776
832
777 @wireprotocommand('changegroup', 'roots')
833 @wireprotocommand('changegroup', 'roots')
778 def changegroup(repo, proto, roots):
834 def changegroup(repo, proto, roots):
779 nodes = decodelist(roots)
835 nodes = decodelist(roots)
780 outgoing = discovery.outgoing(repo, missingroots=nodes,
836 outgoing = discovery.outgoing(repo, missingroots=nodes,
781 missingheads=repo.heads())
837 missingheads=repo.heads())
782 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
838 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
783 gen = iter(lambda: cg.read(32768), '')
839 gen = iter(lambda: cg.read(32768), '')
784 return streamres(gen=gen)
840 return streamres(gen=gen)
785
841
786 @wireprotocommand('changegroupsubset', 'bases heads')
842 @wireprotocommand('changegroupsubset', 'bases heads')
787 def changegroupsubset(repo, proto, bases, heads):
843 def changegroupsubset(repo, proto, bases, heads):
788 bases = decodelist(bases)
844 bases = decodelist(bases)
789 heads = decodelist(heads)
845 heads = decodelist(heads)
790 outgoing = discovery.outgoing(repo, missingroots=bases,
846 outgoing = discovery.outgoing(repo, missingroots=bases,
791 missingheads=heads)
847 missingheads=heads)
792 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
848 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
793 gen = iter(lambda: cg.read(32768), '')
849 gen = iter(lambda: cg.read(32768), '')
794 return streamres(gen=gen)
850 return streamres(gen=gen)
795
851
796 @wireprotocommand('debugwireargs', 'one two *')
852 @wireprotocommand('debugwireargs', 'one two *')
797 def debugwireargs(repo, proto, one, two, others):
853 def debugwireargs(repo, proto, one, two, others):
798 # only accept optional args from the known set
854 # only accept optional args from the known set
799 opts = options('debugwireargs', ['three', 'four'], others)
855 opts = options('debugwireargs', ['three', 'four'], others)
800 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
856 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
801
857
802 @wireprotocommand('getbundle', '*')
858 @wireprotocommand('getbundle', '*')
803 def getbundle(repo, proto, others):
859 def getbundle(repo, proto, others):
804 opts = options('getbundle', gboptsmap.keys(), others)
860 opts = options('getbundle', gboptsmap.keys(), others)
805 for k, v in opts.iteritems():
861 for k, v in opts.iteritems():
806 keytype = gboptsmap[k]
862 keytype = gboptsmap[k]
807 if keytype == 'nodes':
863 if keytype == 'nodes':
808 opts[k] = decodelist(v)
864 opts[k] = decodelist(v)
809 elif keytype == 'csv':
865 elif keytype == 'csv':
810 opts[k] = list(v.split(','))
866 opts[k] = list(v.split(','))
811 elif keytype == 'scsv':
867 elif keytype == 'scsv':
812 opts[k] = set(v.split(','))
868 opts[k] = set(v.split(','))
813 elif keytype == 'boolean':
869 elif keytype == 'boolean':
814 # Client should serialize False as '0', which is a non-empty string
870 # Client should serialize False as '0', which is a non-empty string
815 # so it evaluates as a True bool.
871 # so it evaluates as a True bool.
816 if v == '0':
872 if v == '0':
817 opts[k] = False
873 opts[k] = False
818 else:
874 else:
819 opts[k] = bool(v)
875 opts[k] = bool(v)
820 elif keytype != 'plain':
876 elif keytype != 'plain':
821 raise KeyError('unknown getbundle option type %s'
877 raise KeyError('unknown getbundle option type %s'
822 % keytype)
878 % keytype)
823
879
824 if not bundle1allowed(repo, 'pull'):
880 if not bundle1allowed(repo, 'pull'):
825 if not exchange.bundle2requested(opts.get('bundlecaps')):
881 if not exchange.bundle2requested(opts.get('bundlecaps')):
826 if proto.name == 'http':
882 if proto.name == 'http':
827 return ooberror(bundle2required)
883 return ooberror(bundle2required)
828 raise error.Abort(bundle2requiredmain,
884 raise error.Abort(bundle2requiredmain,
829 hint=bundle2requiredhint)
885 hint=bundle2requiredhint)
830
886
831 prefercompressed = True
887 prefercompressed = True
832
888
833 try:
889 try:
834 if repo.ui.configbool('server', 'disablefullbundle'):
890 if repo.ui.configbool('server', 'disablefullbundle'):
835 # Check to see if this is a full clone.
891 # Check to see if this is a full clone.
836 clheads = set(repo.changelog.heads())
892 clheads = set(repo.changelog.heads())
837 changegroup = opts.get('cg', True)
893 changegroup = opts.get('cg', True)
838 heads = set(opts.get('heads', set()))
894 heads = set(opts.get('heads', set()))
839 common = set(opts.get('common', set()))
895 common = set(opts.get('common', set()))
840 common.discard(nullid)
896 common.discard(nullid)
841 if changegroup and not common and clheads == heads:
897 if changegroup and not common and clheads == heads:
842 raise error.Abort(
898 raise error.Abort(
843 _('server has pull-based clones disabled'),
899 _('server has pull-based clones disabled'),
844 hint=_('remove --pull if specified or upgrade Mercurial'))
900 hint=_('remove --pull if specified or upgrade Mercurial'))
845
901
846 info, chunks = exchange.getbundlechunks(repo, 'serve',
902 info, chunks = exchange.getbundlechunks(repo, 'serve',
847 **pycompat.strkwargs(opts))
903 **pycompat.strkwargs(opts))
848 prefercompressed = info.get('prefercompressed', True)
904 prefercompressed = info.get('prefercompressed', True)
849 except error.Abort as exc:
905 except error.Abort as exc:
850 # cleanly forward Abort error to the client
906 # cleanly forward Abort error to the client
851 if not exchange.bundle2requested(opts.get('bundlecaps')):
907 if not exchange.bundle2requested(opts.get('bundlecaps')):
852 if proto.name == 'http':
908 if proto.name == 'http':
853 return ooberror(str(exc) + '\n')
909 return ooberror(str(exc) + '\n')
854 raise # cannot do better for bundle1 + ssh
910 raise # cannot do better for bundle1 + ssh
855 # bundle2 request expect a bundle2 reply
911 # bundle2 request expect a bundle2 reply
856 bundler = bundle2.bundle20(repo.ui)
912 bundler = bundle2.bundle20(repo.ui)
857 manargs = [('message', str(exc))]
913 manargs = [('message', str(exc))]
858 advargs = []
914 advargs = []
859 if exc.hint is not None:
915 if exc.hint is not None:
860 advargs.append(('hint', exc.hint))
916 advargs.append(('hint', exc.hint))
861 bundler.addpart(bundle2.bundlepart('error:abort',
917 bundler.addpart(bundle2.bundlepart('error:abort',
862 manargs, advargs))
918 manargs, advargs))
863 chunks = bundler.getchunks()
919 chunks = bundler.getchunks()
864 prefercompressed = False
920 prefercompressed = False
865
921
866 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
922 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
867
923
868 @wireprotocommand('heads')
924 @wireprotocommand('heads')
869 def heads(repo, proto):
925 def heads(repo, proto):
870 h = repo.heads()
926 h = repo.heads()
871 return encodelist(h) + "\n"
927 return encodelist(h) + "\n"
872
928
873 @wireprotocommand('hello')
929 @wireprotocommand('hello')
874 def hello(repo, proto):
930 def hello(repo, proto):
875 '''the hello command returns a set of lines describing various
931 '''the hello command returns a set of lines describing various
876 interesting things about the server, in an RFC822-like format.
932 interesting things about the server, in an RFC822-like format.
877 Currently the only one defined is "capabilities", which
933 Currently the only one defined is "capabilities", which
878 consists of a line in the form:
934 consists of a line in the form:
879
935
880 capabilities: space separated list of tokens
936 capabilities: space separated list of tokens
881 '''
937 '''
882 return "capabilities: %s\n" % (capabilities(repo, proto))
938 return "capabilities: %s\n" % (capabilities(repo, proto))
883
939
884 @wireprotocommand('listkeys', 'namespace')
940 @wireprotocommand('listkeys', 'namespace')
885 def listkeys(repo, proto, namespace):
941 def listkeys(repo, proto, namespace):
886 d = repo.listkeys(encoding.tolocal(namespace)).items()
942 d = repo.listkeys(encoding.tolocal(namespace)).items()
887 return pushkeymod.encodekeys(d)
943 return pushkeymod.encodekeys(d)
888
944
889 @wireprotocommand('lookup', 'key')
945 @wireprotocommand('lookup', 'key')
890 def lookup(repo, proto, key):
946 def lookup(repo, proto, key):
891 try:
947 try:
892 k = encoding.tolocal(key)
948 k = encoding.tolocal(key)
893 c = repo[k]
949 c = repo[k]
894 r = c.hex()
950 r = c.hex()
895 success = 1
951 success = 1
896 except Exception as inst:
952 except Exception as inst:
897 r = str(inst)
953 r = str(inst)
898 success = 0
954 success = 0
899 return "%d %s\n" % (success, r)
955 return "%d %s\n" % (success, r)
900
956
901 @wireprotocommand('known', 'nodes *')
957 @wireprotocommand('known', 'nodes *')
902 def known(repo, proto, nodes, others):
958 def known(repo, proto, nodes, others):
903 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
959 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
904
960
905 @wireprotocommand('pushkey', 'namespace key old new')
961 @wireprotocommand('pushkey', 'namespace key old new')
906 def pushkey(repo, proto, namespace, key, old, new):
962 def pushkey(repo, proto, namespace, key, old, new):
907 # compatibility with pre-1.8 clients which were accidentally
963 # compatibility with pre-1.8 clients which were accidentally
908 # sending raw binary nodes rather than utf-8-encoded hex
964 # sending raw binary nodes rather than utf-8-encoded hex
909 if len(new) == 20 and util.escapestr(new) != new:
965 if len(new) == 20 and util.escapestr(new) != new:
910 # looks like it could be a binary node
966 # looks like it could be a binary node
911 try:
967 try:
912 new.decode('utf-8')
968 new.decode('utf-8')
913 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
969 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
914 except UnicodeDecodeError:
970 except UnicodeDecodeError:
915 pass # binary, leave unmodified
971 pass # binary, leave unmodified
916 else:
972 else:
917 new = encoding.tolocal(new) # normal path
973 new = encoding.tolocal(new) # normal path
918
974
919 if util.safehasattr(proto, 'restore'):
975 if util.safehasattr(proto, 'restore'):
920
976
921 proto.redirect()
977 proto.redirect()
922
978
923 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
979 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
924 encoding.tolocal(old), new) or False
980 encoding.tolocal(old), new) or False
925
981
926 output = proto.restore()
982 output = proto.restore()
927
983
928 return '%s\n%s' % (int(r), output)
984 return '%s\n%s' % (int(r), output)
929
985
930 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
986 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
931 encoding.tolocal(old), new)
987 encoding.tolocal(old), new)
932 return '%s\n' % int(r)
988 return '%s\n' % int(r)
933
989
934 @wireprotocommand('stream_out')
990 @wireprotocommand('stream_out')
935 def stream(repo, proto):
991 def stream(repo, proto):
936 '''If the server supports streaming clone, it advertises the "stream"
992 '''If the server supports streaming clone, it advertises the "stream"
937 capability with a value representing the version and flags of the repo
993 capability with a value representing the version and flags of the repo
938 it is serving. Client checks to see if it understands the format.
994 it is serving. Client checks to see if it understands the format.
939 '''
995 '''
940 return streamres_legacy(streamclone.generatev1wireproto(repo))
996 return streamres_legacy(streamclone.generatev1wireproto(repo))
941
997
942 @wireprotocommand('unbundle', 'heads')
998 @wireprotocommand('unbundle', 'heads')
943 def unbundle(repo, proto, heads):
999 def unbundle(repo, proto, heads):
944 their_heads = decodelist(heads)
1000 their_heads = decodelist(heads)
945
1001
946 try:
1002 try:
947 proto.redirect()
1003 proto.redirect()
948
1004
949 exchange.check_heads(repo, their_heads, 'preparing changes')
1005 exchange.check_heads(repo, their_heads, 'preparing changes')
950
1006
951 # write bundle data to temporary file because it can be big
1007 # write bundle data to temporary file because it can be big
952 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1008 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
953 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1009 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
954 r = 0
1010 r = 0
955 try:
1011 try:
956 proto.getfile(fp)
1012 proto.getfile(fp)
957 fp.seek(0)
1013 fp.seek(0)
958 gen = exchange.readbundle(repo.ui, fp, None)
1014 gen = exchange.readbundle(repo.ui, fp, None)
959 if (isinstance(gen, changegroupmod.cg1unpacker)
1015 if (isinstance(gen, changegroupmod.cg1unpacker)
960 and not bundle1allowed(repo, 'push')):
1016 and not bundle1allowed(repo, 'push')):
961 if proto.name == 'http':
1017 if proto.name == 'http':
962 # need to special case http because stderr do not get to
1018 # need to special case http because stderr do not get to
963 # the http client on failed push so we need to abuse some
1019 # the http client on failed push so we need to abuse some
964 # other error type to make sure the message get to the
1020 # other error type to make sure the message get to the
965 # user.
1021 # user.
966 return ooberror(bundle2required)
1022 return ooberror(bundle2required)
967 raise error.Abort(bundle2requiredmain,
1023 raise error.Abort(bundle2requiredmain,
968 hint=bundle2requiredhint)
1024 hint=bundle2requiredhint)
969
1025
970 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1026 r = exchange.unbundle(repo, gen, their_heads, 'serve',
971 proto._client())
1027 proto._client())
972 if util.safehasattr(r, 'addpart'):
1028 if util.safehasattr(r, 'addpart'):
973 # The return looks streamable, we are in the bundle2 case and
1029 # The return looks streamable, we are in the bundle2 case and
974 # should return a stream.
1030 # should return a stream.
975 return streamres_legacy(gen=r.getchunks())
1031 return streamres_legacy(gen=r.getchunks())
976 return pushres(r)
1032 return pushres(r)
977
1033
978 finally:
1034 finally:
979 fp.close()
1035 fp.close()
980 os.unlink(tempname)
1036 os.unlink(tempname)
981
1037
982 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1038 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
983 # handle non-bundle2 case first
1039 # handle non-bundle2 case first
984 if not getattr(exc, 'duringunbundle2', False):
1040 if not getattr(exc, 'duringunbundle2', False):
985 try:
1041 try:
986 raise
1042 raise
987 except error.Abort:
1043 except error.Abort:
988 # The old code we moved used util.stderr directly.
1044 # The old code we moved used util.stderr directly.
989 # We did not change it to minimise code change.
1045 # We did not change it to minimise code change.
990 # This need to be moved to something proper.
1046 # This need to be moved to something proper.
991 # Feel free to do it.
1047 # Feel free to do it.
992 util.stderr.write("abort: %s\n" % exc)
1048 util.stderr.write("abort: %s\n" % exc)
993 if exc.hint is not None:
1049 if exc.hint is not None:
994 util.stderr.write("(%s)\n" % exc.hint)
1050 util.stderr.write("(%s)\n" % exc.hint)
995 return pushres(0)
1051 return pushres(0)
996 except error.PushRaced:
1052 except error.PushRaced:
997 return pusherr(str(exc))
1053 return pusherr(str(exc))
998
1054
999 bundler = bundle2.bundle20(repo.ui)
1055 bundler = bundle2.bundle20(repo.ui)
1000 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1056 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1001 bundler.addpart(out)
1057 bundler.addpart(out)
1002 try:
1058 try:
1003 try:
1059 try:
1004 raise
1060 raise
1005 except error.PushkeyFailed as exc:
1061 except error.PushkeyFailed as exc:
1006 # check client caps
1062 # check client caps
1007 remotecaps = getattr(exc, '_replycaps', None)
1063 remotecaps = getattr(exc, '_replycaps', None)
1008 if (remotecaps is not None
1064 if (remotecaps is not None
1009 and 'pushkey' not in remotecaps.get('error', ())):
1065 and 'pushkey' not in remotecaps.get('error', ())):
1010 # no support remote side, fallback to Abort handler.
1066 # no support remote side, fallback to Abort handler.
1011 raise
1067 raise
1012 part = bundler.newpart('error:pushkey')
1068 part = bundler.newpart('error:pushkey')
1013 part.addparam('in-reply-to', exc.partid)
1069 part.addparam('in-reply-to', exc.partid)
1014 if exc.namespace is not None:
1070 if exc.namespace is not None:
1015 part.addparam('namespace', exc.namespace, mandatory=False)
1071 part.addparam('namespace', exc.namespace, mandatory=False)
1016 if exc.key is not None:
1072 if exc.key is not None:
1017 part.addparam('key', exc.key, mandatory=False)
1073 part.addparam('key', exc.key, mandatory=False)
1018 if exc.new is not None:
1074 if exc.new is not None:
1019 part.addparam('new', exc.new, mandatory=False)
1075 part.addparam('new', exc.new, mandatory=False)
1020 if exc.old is not None:
1076 if exc.old is not None:
1021 part.addparam('old', exc.old, mandatory=False)
1077 part.addparam('old', exc.old, mandatory=False)
1022 if exc.ret is not None:
1078 if exc.ret is not None:
1023 part.addparam('ret', exc.ret, mandatory=False)
1079 part.addparam('ret', exc.ret, mandatory=False)
1024 except error.BundleValueError as exc:
1080 except error.BundleValueError as exc:
1025 errpart = bundler.newpart('error:unsupportedcontent')
1081 errpart = bundler.newpart('error:unsupportedcontent')
1026 if exc.parttype is not None:
1082 if exc.parttype is not None:
1027 errpart.addparam('parttype', exc.parttype)
1083 errpart.addparam('parttype', exc.parttype)
1028 if exc.params:
1084 if exc.params:
1029 errpart.addparam('params', '\0'.join(exc.params))
1085 errpart.addparam('params', '\0'.join(exc.params))
1030 except error.Abort as exc:
1086 except error.Abort as exc:
1031 manargs = [('message', str(exc))]
1087 manargs = [('message', str(exc))]
1032 advargs = []
1088 advargs = []
1033 if exc.hint is not None:
1089 if exc.hint is not None:
1034 advargs.append(('hint', exc.hint))
1090 advargs.append(('hint', exc.hint))
1035 bundler.addpart(bundle2.bundlepart('error:abort',
1091 bundler.addpart(bundle2.bundlepart('error:abort',
1036 manargs, advargs))
1092 manargs, advargs))
1037 except error.PushRaced as exc:
1093 except error.PushRaced as exc:
1038 bundler.newpart('error:pushraced', [('message', str(exc))])
1094 bundler.newpart('error:pushraced', [('message', str(exc))])
1039 return streamres_legacy(gen=bundler.getchunks())
1095 return streamres_legacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now