##// END OF EJS Templates
wireproto: use repo.lookup() for lookup command...
Martin von Zweigbergk -
r37371:ac666c5c default
parent child Browse files
Show More
@@ -1,1163 +1,1163 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 wireprototypes,
34 wireprototypes,
35 )
35 )
36
36
37 from .utils import (
37 from .utils import (
38 procutil,
38 procutil,
39 stringutil,
39 stringutil,
40 )
40 )
41
41
42 urlerr = util.urlerr
42 urlerr = util.urlerr
43 urlreq = util.urlreq
43 urlreq = util.urlreq
44
44
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
47 'IncompatibleClient')
47 'IncompatibleClient')
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49
49
50 class remoteiterbatcher(peer.iterbatcher):
50 class remoteiterbatcher(peer.iterbatcher):
51 def __init__(self, remote):
51 def __init__(self, remote):
52 super(remoteiterbatcher, self).__init__()
52 super(remoteiterbatcher, self).__init__()
53 self._remote = remote
53 self._remote = remote
54
54
55 def __getattr__(self, name):
55 def __getattr__(self, name):
56 # Validate this method is batchable, since submit() only supports
56 # Validate this method is batchable, since submit() only supports
57 # batchable methods.
57 # batchable methods.
58 fn = getattr(self._remote, name)
58 fn = getattr(self._remote, name)
59 if not getattr(fn, 'batchable', None):
59 if not getattr(fn, 'batchable', None):
60 raise error.ProgrammingError('Attempted to batch a non-batchable '
60 raise error.ProgrammingError('Attempted to batch a non-batchable '
61 'call to %r' % name)
61 'call to %r' % name)
62
62
63 return super(remoteiterbatcher, self).__getattr__(name)
63 return super(remoteiterbatcher, self).__getattr__(name)
64
64
65 def submit(self):
65 def submit(self):
66 """Break the batch request into many patch calls and pipeline them.
66 """Break the batch request into many patch calls and pipeline them.
67
67
68 This is mostly valuable over http where request sizes can be
68 This is mostly valuable over http where request sizes can be
69 limited, but can be used in other places as well.
69 limited, but can be used in other places as well.
70 """
70 """
71 # 2-tuple of (command, arguments) that represents what will be
71 # 2-tuple of (command, arguments) that represents what will be
72 # sent over the wire.
72 # sent over the wire.
73 requests = []
73 requests = []
74
74
75 # 4-tuple of (command, final future, @batchable generator, remote
75 # 4-tuple of (command, final future, @batchable generator, remote
76 # future).
76 # future).
77 results = []
77 results = []
78
78
79 for command, args, opts, finalfuture in self.calls:
79 for command, args, opts, finalfuture in self.calls:
80 mtd = getattr(self._remote, command)
80 mtd = getattr(self._remote, command)
81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
82
82
83 commandargs, fremote = next(batchable)
83 commandargs, fremote = next(batchable)
84 assert fremote
84 assert fremote
85 requests.append((command, commandargs))
85 requests.append((command, commandargs))
86 results.append((command, finalfuture, batchable, fremote))
86 results.append((command, finalfuture, batchable, fremote))
87
87
88 if requests:
88 if requests:
89 self._resultiter = self._remote._submitbatch(requests)
89 self._resultiter = self._remote._submitbatch(requests)
90
90
91 self._results = results
91 self._results = results
92
92
93 def results(self):
93 def results(self):
94 for command, finalfuture, batchable, remotefuture in self._results:
94 for command, finalfuture, batchable, remotefuture in self._results:
95 # Get the raw result, set it in the remote future, feed it
95 # Get the raw result, set it in the remote future, feed it
96 # back into the @batchable generator so it can be decoded, and
96 # back into the @batchable generator so it can be decoded, and
97 # set the result on the final future to this value.
97 # set the result on the final future to this value.
98 remoteresult = next(self._resultiter)
98 remoteresult = next(self._resultiter)
99 remotefuture.set(remoteresult)
99 remotefuture.set(remoteresult)
100 finalfuture.set(next(batchable))
100 finalfuture.set(next(batchable))
101
101
102 # Verify our @batchable generators only emit 2 values.
102 # Verify our @batchable generators only emit 2 values.
103 try:
103 try:
104 next(batchable)
104 next(batchable)
105 except StopIteration:
105 except StopIteration:
106 pass
106 pass
107 else:
107 else:
108 raise error.ProgrammingError('%s @batchable generator emitted '
108 raise error.ProgrammingError('%s @batchable generator emitted '
109 'unexpected value count' % command)
109 'unexpected value count' % command)
110
110
111 yield finalfuture.value
111 yield finalfuture.value
112
112
113 # Forward a couple of names from peer to make wireproto interactions
113 # Forward a couple of names from peer to make wireproto interactions
114 # slightly more sensible.
114 # slightly more sensible.
115 batchable = peer.batchable
115 batchable = peer.batchable
116 future = peer.future
116 future = peer.future
117
117
118 # list of nodes encoding / decoding
118 # list of nodes encoding / decoding
119
119
120 def decodelist(l, sep=' '):
120 def decodelist(l, sep=' '):
121 if l:
121 if l:
122 return [bin(v) for v in l.split(sep)]
122 return [bin(v) for v in l.split(sep)]
123 return []
123 return []
124
124
125 def encodelist(l, sep=' '):
125 def encodelist(l, sep=' '):
126 try:
126 try:
127 return sep.join(map(hex, l))
127 return sep.join(map(hex, l))
128 except TypeError:
128 except TypeError:
129 raise
129 raise
130
130
131 # batched call argument encoding
131 # batched call argument encoding
132
132
133 def escapearg(plain):
133 def escapearg(plain):
134 return (plain
134 return (plain
135 .replace(':', ':c')
135 .replace(':', ':c')
136 .replace(',', ':o')
136 .replace(',', ':o')
137 .replace(';', ':s')
137 .replace(';', ':s')
138 .replace('=', ':e'))
138 .replace('=', ':e'))
139
139
140 def unescapearg(escaped):
140 def unescapearg(escaped):
141 return (escaped
141 return (escaped
142 .replace(':e', '=')
142 .replace(':e', '=')
143 .replace(':s', ';')
143 .replace(':s', ';')
144 .replace(':o', ',')
144 .replace(':o', ',')
145 .replace(':c', ':'))
145 .replace(':c', ':'))
146
146
147 def encodebatchcmds(req):
147 def encodebatchcmds(req):
148 """Return a ``cmds`` argument value for the ``batch`` command."""
148 """Return a ``cmds`` argument value for the ``batch`` command."""
149 cmds = []
149 cmds = []
150 for op, argsdict in req:
150 for op, argsdict in req:
151 # Old servers didn't properly unescape argument names. So prevent
151 # Old servers didn't properly unescape argument names. So prevent
152 # the sending of argument names that may not be decoded properly by
152 # the sending of argument names that may not be decoded properly by
153 # servers.
153 # servers.
154 assert all(escapearg(k) == k for k in argsdict)
154 assert all(escapearg(k) == k for k in argsdict)
155
155
156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
157 for k, v in argsdict.iteritems())
157 for k, v in argsdict.iteritems())
158 cmds.append('%s %s' % (op, args))
158 cmds.append('%s %s' % (op, args))
159
159
160 return ';'.join(cmds)
160 return ';'.join(cmds)
161
161
162 # mapping of options accepted by getbundle and their types
162 # mapping of options accepted by getbundle and their types
163 #
163 #
164 # Meant to be extended by extensions. It is extensions responsibility to ensure
164 # Meant to be extended by extensions. It is extensions responsibility to ensure
165 # such options are properly processed in exchange.getbundle.
165 # such options are properly processed in exchange.getbundle.
166 #
166 #
167 # supported types are:
167 # supported types are:
168 #
168 #
169 # :nodes: list of binary nodes
169 # :nodes: list of binary nodes
170 # :csv: list of comma-separated values
170 # :csv: list of comma-separated values
171 # :scsv: list of comma-separated values return as set
171 # :scsv: list of comma-separated values return as set
172 # :plain: string with no transformation needed.
172 # :plain: string with no transformation needed.
173 gboptsmap = {'heads': 'nodes',
173 gboptsmap = {'heads': 'nodes',
174 'bookmarks': 'boolean',
174 'bookmarks': 'boolean',
175 'common': 'nodes',
175 'common': 'nodes',
176 'obsmarkers': 'boolean',
176 'obsmarkers': 'boolean',
177 'phases': 'boolean',
177 'phases': 'boolean',
178 'bundlecaps': 'scsv',
178 'bundlecaps': 'scsv',
179 'listkeys': 'csv',
179 'listkeys': 'csv',
180 'cg': 'boolean',
180 'cg': 'boolean',
181 'cbattempted': 'boolean',
181 'cbattempted': 'boolean',
182 'stream': 'boolean',
182 'stream': 'boolean',
183 }
183 }
184
184
185 # client side
185 # client side
186
186
187 class wirepeer(repository.legacypeer):
187 class wirepeer(repository.legacypeer):
188 """Client-side interface for communicating with a peer repository.
188 """Client-side interface for communicating with a peer repository.
189
189
190 Methods commonly call wire protocol commands of the same name.
190 Methods commonly call wire protocol commands of the same name.
191
191
192 See also httppeer.py and sshpeer.py for protocol-specific
192 See also httppeer.py and sshpeer.py for protocol-specific
193 implementations of this interface.
193 implementations of this interface.
194 """
194 """
195 # Begin of ipeercommands interface.
195 # Begin of ipeercommands interface.
196
196
197 def iterbatch(self):
197 def iterbatch(self):
198 return remoteiterbatcher(self)
198 return remoteiterbatcher(self)
199
199
200 @batchable
200 @batchable
201 def lookup(self, key):
201 def lookup(self, key):
202 self.requirecap('lookup', _('look up remote revision'))
202 self.requirecap('lookup', _('look up remote revision'))
203 f = future()
203 f = future()
204 yield {'key': encoding.fromlocal(key)}, f
204 yield {'key': encoding.fromlocal(key)}, f
205 d = f.value
205 d = f.value
206 success, data = d[:-1].split(" ", 1)
206 success, data = d[:-1].split(" ", 1)
207 if int(success):
207 if int(success):
208 yield bin(data)
208 yield bin(data)
209 else:
209 else:
210 self._abort(error.RepoError(data))
210 self._abort(error.RepoError(data))
211
211
212 @batchable
212 @batchable
213 def heads(self):
213 def heads(self):
214 f = future()
214 f = future()
215 yield {}, f
215 yield {}, f
216 d = f.value
216 d = f.value
217 try:
217 try:
218 yield decodelist(d[:-1])
218 yield decodelist(d[:-1])
219 except ValueError:
219 except ValueError:
220 self._abort(error.ResponseError(_("unexpected response:"), d))
220 self._abort(error.ResponseError(_("unexpected response:"), d))
221
221
222 @batchable
222 @batchable
223 def known(self, nodes):
223 def known(self, nodes):
224 f = future()
224 f = future()
225 yield {'nodes': encodelist(nodes)}, f
225 yield {'nodes': encodelist(nodes)}, f
226 d = f.value
226 d = f.value
227 try:
227 try:
228 yield [bool(int(b)) for b in d]
228 yield [bool(int(b)) for b in d]
229 except ValueError:
229 except ValueError:
230 self._abort(error.ResponseError(_("unexpected response:"), d))
230 self._abort(error.ResponseError(_("unexpected response:"), d))
231
231
232 @batchable
232 @batchable
233 def branchmap(self):
233 def branchmap(self):
234 f = future()
234 f = future()
235 yield {}, f
235 yield {}, f
236 d = f.value
236 d = f.value
237 try:
237 try:
238 branchmap = {}
238 branchmap = {}
239 for branchpart in d.splitlines():
239 for branchpart in d.splitlines():
240 branchname, branchheads = branchpart.split(' ', 1)
240 branchname, branchheads = branchpart.split(' ', 1)
241 branchname = encoding.tolocal(urlreq.unquote(branchname))
241 branchname = encoding.tolocal(urlreq.unquote(branchname))
242 branchheads = decodelist(branchheads)
242 branchheads = decodelist(branchheads)
243 branchmap[branchname] = branchheads
243 branchmap[branchname] = branchheads
244 yield branchmap
244 yield branchmap
245 except TypeError:
245 except TypeError:
246 self._abort(error.ResponseError(_("unexpected response:"), d))
246 self._abort(error.ResponseError(_("unexpected response:"), d))
247
247
248 @batchable
248 @batchable
249 def listkeys(self, namespace):
249 def listkeys(self, namespace):
250 if not self.capable('pushkey'):
250 if not self.capable('pushkey'):
251 yield {}, None
251 yield {}, None
252 f = future()
252 f = future()
253 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
253 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
254 yield {'namespace': encoding.fromlocal(namespace)}, f
254 yield {'namespace': encoding.fromlocal(namespace)}, f
255 d = f.value
255 d = f.value
256 self.ui.debug('received listkey for "%s": %i bytes\n'
256 self.ui.debug('received listkey for "%s": %i bytes\n'
257 % (namespace, len(d)))
257 % (namespace, len(d)))
258 yield pushkeymod.decodekeys(d)
258 yield pushkeymod.decodekeys(d)
259
259
260 @batchable
260 @batchable
261 def pushkey(self, namespace, key, old, new):
261 def pushkey(self, namespace, key, old, new):
262 if not self.capable('pushkey'):
262 if not self.capable('pushkey'):
263 yield False, None
263 yield False, None
264 f = future()
264 f = future()
265 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
265 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
266 yield {'namespace': encoding.fromlocal(namespace),
266 yield {'namespace': encoding.fromlocal(namespace),
267 'key': encoding.fromlocal(key),
267 'key': encoding.fromlocal(key),
268 'old': encoding.fromlocal(old),
268 'old': encoding.fromlocal(old),
269 'new': encoding.fromlocal(new)}, f
269 'new': encoding.fromlocal(new)}, f
270 d = f.value
270 d = f.value
271 d, output = d.split('\n', 1)
271 d, output = d.split('\n', 1)
272 try:
272 try:
273 d = bool(int(d))
273 d = bool(int(d))
274 except ValueError:
274 except ValueError:
275 raise error.ResponseError(
275 raise error.ResponseError(
276 _('push failed (unexpected response):'), d)
276 _('push failed (unexpected response):'), d)
277 for l in output.splitlines(True):
277 for l in output.splitlines(True):
278 self.ui.status(_('remote: '), l)
278 self.ui.status(_('remote: '), l)
279 yield d
279 yield d
280
280
281 def stream_out(self):
281 def stream_out(self):
282 return self._callstream('stream_out')
282 return self._callstream('stream_out')
283
283
284 def getbundle(self, source, **kwargs):
284 def getbundle(self, source, **kwargs):
285 kwargs = pycompat.byteskwargs(kwargs)
285 kwargs = pycompat.byteskwargs(kwargs)
286 self.requirecap('getbundle', _('look up remote changes'))
286 self.requirecap('getbundle', _('look up remote changes'))
287 opts = {}
287 opts = {}
288 bundlecaps = kwargs.get('bundlecaps')
288 bundlecaps = kwargs.get('bundlecaps')
289 if bundlecaps is not None:
289 if bundlecaps is not None:
290 kwargs['bundlecaps'] = sorted(bundlecaps)
290 kwargs['bundlecaps'] = sorted(bundlecaps)
291 else:
291 else:
292 bundlecaps = () # kwargs could have it to None
292 bundlecaps = () # kwargs could have it to None
293 for key, value in kwargs.iteritems():
293 for key, value in kwargs.iteritems():
294 if value is None:
294 if value is None:
295 continue
295 continue
296 keytype = gboptsmap.get(key)
296 keytype = gboptsmap.get(key)
297 if keytype is None:
297 if keytype is None:
298 raise error.ProgrammingError(
298 raise error.ProgrammingError(
299 'Unexpectedly None keytype for key %s' % key)
299 'Unexpectedly None keytype for key %s' % key)
300 elif keytype == 'nodes':
300 elif keytype == 'nodes':
301 value = encodelist(value)
301 value = encodelist(value)
302 elif keytype in ('csv', 'scsv'):
302 elif keytype in ('csv', 'scsv'):
303 value = ','.join(value)
303 value = ','.join(value)
304 elif keytype == 'boolean':
304 elif keytype == 'boolean':
305 value = '%i' % bool(value)
305 value = '%i' % bool(value)
306 elif keytype != 'plain':
306 elif keytype != 'plain':
307 raise KeyError('unknown getbundle option type %s'
307 raise KeyError('unknown getbundle option type %s'
308 % keytype)
308 % keytype)
309 opts[key] = value
309 opts[key] = value
310 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
310 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
311 if any((cap.startswith('HG2') for cap in bundlecaps)):
311 if any((cap.startswith('HG2') for cap in bundlecaps)):
312 return bundle2.getunbundler(self.ui, f)
312 return bundle2.getunbundler(self.ui, f)
313 else:
313 else:
314 return changegroupmod.cg1unpacker(f, 'UN')
314 return changegroupmod.cg1unpacker(f, 'UN')
315
315
316 def unbundle(self, cg, heads, url):
316 def unbundle(self, cg, heads, url):
317 '''Send cg (a readable file-like object representing the
317 '''Send cg (a readable file-like object representing the
318 changegroup to push, typically a chunkbuffer object) to the
318 changegroup to push, typically a chunkbuffer object) to the
319 remote server as a bundle.
319 remote server as a bundle.
320
320
321 When pushing a bundle10 stream, return an integer indicating the
321 When pushing a bundle10 stream, return an integer indicating the
322 result of the push (see changegroup.apply()).
322 result of the push (see changegroup.apply()).
323
323
324 When pushing a bundle20 stream, return a bundle20 stream.
324 When pushing a bundle20 stream, return a bundle20 stream.
325
325
326 `url` is the url the client thinks it's pushing to, which is
326 `url` is the url the client thinks it's pushing to, which is
327 visible to hooks.
327 visible to hooks.
328 '''
328 '''
329
329
330 if heads != ['force'] and self.capable('unbundlehash'):
330 if heads != ['force'] and self.capable('unbundlehash'):
331 heads = encodelist(['hashed',
331 heads = encodelist(['hashed',
332 hashlib.sha1(''.join(sorted(heads))).digest()])
332 hashlib.sha1(''.join(sorted(heads))).digest()])
333 else:
333 else:
334 heads = encodelist(heads)
334 heads = encodelist(heads)
335
335
336 if util.safehasattr(cg, 'deltaheader'):
336 if util.safehasattr(cg, 'deltaheader'):
337 # this a bundle10, do the old style call sequence
337 # this a bundle10, do the old style call sequence
338 ret, output = self._callpush("unbundle", cg, heads=heads)
338 ret, output = self._callpush("unbundle", cg, heads=heads)
339 if ret == "":
339 if ret == "":
340 raise error.ResponseError(
340 raise error.ResponseError(
341 _('push failed:'), output)
341 _('push failed:'), output)
342 try:
342 try:
343 ret = int(ret)
343 ret = int(ret)
344 except ValueError:
344 except ValueError:
345 raise error.ResponseError(
345 raise error.ResponseError(
346 _('push failed (unexpected response):'), ret)
346 _('push failed (unexpected response):'), ret)
347
347
348 for l in output.splitlines(True):
348 for l in output.splitlines(True):
349 self.ui.status(_('remote: '), l)
349 self.ui.status(_('remote: '), l)
350 else:
350 else:
351 # bundle2 push. Send a stream, fetch a stream.
351 # bundle2 push. Send a stream, fetch a stream.
352 stream = self._calltwowaystream('unbundle', cg, heads=heads)
352 stream = self._calltwowaystream('unbundle', cg, heads=heads)
353 ret = bundle2.getunbundler(self.ui, stream)
353 ret = bundle2.getunbundler(self.ui, stream)
354 return ret
354 return ret
355
355
356 # End of ipeercommands interface.
356 # End of ipeercommands interface.
357
357
358 # Begin of ipeerlegacycommands interface.
358 # Begin of ipeerlegacycommands interface.
359
359
360 def branches(self, nodes):
360 def branches(self, nodes):
361 n = encodelist(nodes)
361 n = encodelist(nodes)
362 d = self._call("branches", nodes=n)
362 d = self._call("branches", nodes=n)
363 try:
363 try:
364 br = [tuple(decodelist(b)) for b in d.splitlines()]
364 br = [tuple(decodelist(b)) for b in d.splitlines()]
365 return br
365 return br
366 except ValueError:
366 except ValueError:
367 self._abort(error.ResponseError(_("unexpected response:"), d))
367 self._abort(error.ResponseError(_("unexpected response:"), d))
368
368
369 def between(self, pairs):
369 def between(self, pairs):
370 batch = 8 # avoid giant requests
370 batch = 8 # avoid giant requests
371 r = []
371 r = []
372 for i in xrange(0, len(pairs), batch):
372 for i in xrange(0, len(pairs), batch):
373 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
373 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
374 d = self._call("between", pairs=n)
374 d = self._call("between", pairs=n)
375 try:
375 try:
376 r.extend(l and decodelist(l) or [] for l in d.splitlines())
376 r.extend(l and decodelist(l) or [] for l in d.splitlines())
377 except ValueError:
377 except ValueError:
378 self._abort(error.ResponseError(_("unexpected response:"), d))
378 self._abort(error.ResponseError(_("unexpected response:"), d))
379 return r
379 return r
380
380
381 def changegroup(self, nodes, kind):
381 def changegroup(self, nodes, kind):
382 n = encodelist(nodes)
382 n = encodelist(nodes)
383 f = self._callcompressable("changegroup", roots=n)
383 f = self._callcompressable("changegroup", roots=n)
384 return changegroupmod.cg1unpacker(f, 'UN')
384 return changegroupmod.cg1unpacker(f, 'UN')
385
385
386 def changegroupsubset(self, bases, heads, kind):
386 def changegroupsubset(self, bases, heads, kind):
387 self.requirecap('changegroupsubset', _('look up remote changes'))
387 self.requirecap('changegroupsubset', _('look up remote changes'))
388 bases = encodelist(bases)
388 bases = encodelist(bases)
389 heads = encodelist(heads)
389 heads = encodelist(heads)
390 f = self._callcompressable("changegroupsubset",
390 f = self._callcompressable("changegroupsubset",
391 bases=bases, heads=heads)
391 bases=bases, heads=heads)
392 return changegroupmod.cg1unpacker(f, 'UN')
392 return changegroupmod.cg1unpacker(f, 'UN')
393
393
394 # End of ipeerlegacycommands interface.
394 # End of ipeerlegacycommands interface.
395
395
396 def _submitbatch(self, req):
396 def _submitbatch(self, req):
397 """run batch request <req> on the server
397 """run batch request <req> on the server
398
398
399 Returns an iterator of the raw responses from the server.
399 Returns an iterator of the raw responses from the server.
400 """
400 """
401 ui = self.ui
401 ui = self.ui
402 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
402 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
403 ui.debug('devel-peer-request: batched-content\n')
403 ui.debug('devel-peer-request: batched-content\n')
404 for op, args in req:
404 for op, args in req:
405 msg = 'devel-peer-request: - %s (%d arguments)\n'
405 msg = 'devel-peer-request: - %s (%d arguments)\n'
406 ui.debug(msg % (op, len(args)))
406 ui.debug(msg % (op, len(args)))
407
407
408 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
408 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
409 chunk = rsp.read(1024)
409 chunk = rsp.read(1024)
410 work = [chunk]
410 work = [chunk]
411 while chunk:
411 while chunk:
412 while ';' not in chunk and chunk:
412 while ';' not in chunk and chunk:
413 chunk = rsp.read(1024)
413 chunk = rsp.read(1024)
414 work.append(chunk)
414 work.append(chunk)
415 merged = ''.join(work)
415 merged = ''.join(work)
416 while ';' in merged:
416 while ';' in merged:
417 one, merged = merged.split(';', 1)
417 one, merged = merged.split(';', 1)
418 yield unescapearg(one)
418 yield unescapearg(one)
419 chunk = rsp.read(1024)
419 chunk = rsp.read(1024)
420 work = [merged, chunk]
420 work = [merged, chunk]
421 yield unescapearg(''.join(work))
421 yield unescapearg(''.join(work))
422
422
423 def _submitone(self, op, args):
423 def _submitone(self, op, args):
424 return self._call(op, **pycompat.strkwargs(args))
424 return self._call(op, **pycompat.strkwargs(args))
425
425
426 def debugwireargs(self, one, two, three=None, four=None, five=None):
426 def debugwireargs(self, one, two, three=None, four=None, five=None):
427 # don't pass optional arguments left at their default value
427 # don't pass optional arguments left at their default value
428 opts = {}
428 opts = {}
429 if three is not None:
429 if three is not None:
430 opts[r'three'] = three
430 opts[r'three'] = three
431 if four is not None:
431 if four is not None:
432 opts[r'four'] = four
432 opts[r'four'] = four
433 return self._call('debugwireargs', one=one, two=two, **opts)
433 return self._call('debugwireargs', one=one, two=two, **opts)
434
434
435 def _call(self, cmd, **args):
435 def _call(self, cmd, **args):
436 """execute <cmd> on the server
436 """execute <cmd> on the server
437
437
438 The command is expected to return a simple string.
438 The command is expected to return a simple string.
439
439
440 returns the server reply as a string."""
440 returns the server reply as a string."""
441 raise NotImplementedError()
441 raise NotImplementedError()
442
442
443 def _callstream(self, cmd, **args):
443 def _callstream(self, cmd, **args):
444 """execute <cmd> on the server
444 """execute <cmd> on the server
445
445
446 The command is expected to return a stream. Note that if the
446 The command is expected to return a stream. Note that if the
447 command doesn't return a stream, _callstream behaves
447 command doesn't return a stream, _callstream behaves
448 differently for ssh and http peers.
448 differently for ssh and http peers.
449
449
450 returns the server reply as a file like object.
450 returns the server reply as a file like object.
451 """
451 """
452 raise NotImplementedError()
452 raise NotImplementedError()
453
453
454 def _callcompressable(self, cmd, **args):
454 def _callcompressable(self, cmd, **args):
455 """execute <cmd> on the server
455 """execute <cmd> on the server
456
456
457 The command is expected to return a stream.
457 The command is expected to return a stream.
458
458
459 The stream may have been compressed in some implementations. This
459 The stream may have been compressed in some implementations. This
460 function takes care of the decompression. This is the only difference
460 function takes care of the decompression. This is the only difference
461 with _callstream.
461 with _callstream.
462
462
463 returns the server reply as a file like object.
463 returns the server reply as a file like object.
464 """
464 """
465 raise NotImplementedError()
465 raise NotImplementedError()
466
466
467 def _callpush(self, cmd, fp, **args):
467 def _callpush(self, cmd, fp, **args):
468 """execute a <cmd> on server
468 """execute a <cmd> on server
469
469
470 The command is expected to be related to a push. Push has a special
470 The command is expected to be related to a push. Push has a special
471 return method.
471 return method.
472
472
473 returns the server reply as a (ret, output) tuple. ret is either
473 returns the server reply as a (ret, output) tuple. ret is either
474 empty (error) or a stringified int.
474 empty (error) or a stringified int.
475 """
475 """
476 raise NotImplementedError()
476 raise NotImplementedError()
477
477
478 def _calltwowaystream(self, cmd, fp, **args):
478 def _calltwowaystream(self, cmd, fp, **args):
479 """execute <cmd> on server
479 """execute <cmd> on server
480
480
481 The command will send a stream to the server and get a stream in reply.
481 The command will send a stream to the server and get a stream in reply.
482 """
482 """
483 raise NotImplementedError()
483 raise NotImplementedError()
484
484
485 def _abort(self, exception):
485 def _abort(self, exception):
486 """clearly abort the wire protocol connection and raise the exception
486 """clearly abort the wire protocol connection and raise the exception
487 """
487 """
488 raise NotImplementedError()
488 raise NotImplementedError()
489
489
490 # server side
490 # server side
491
491
492 # wire protocol command can either return a string or one of these classes.
492 # wire protocol command can either return a string or one of these classes.
493
493
494 def getdispatchrepo(repo, proto, command):
494 def getdispatchrepo(repo, proto, command):
495 """Obtain the repo used for processing wire protocol commands.
495 """Obtain the repo used for processing wire protocol commands.
496
496
497 The intent of this function is to serve as a monkeypatch point for
497 The intent of this function is to serve as a monkeypatch point for
498 extensions that need commands to operate on different repo views under
498 extensions that need commands to operate on different repo views under
499 specialized circumstances.
499 specialized circumstances.
500 """
500 """
501 return repo.filtered('served')
501 return repo.filtered('served')
502
502
503 def dispatch(repo, proto, command):
503 def dispatch(repo, proto, command):
504 repo = getdispatchrepo(repo, proto, command)
504 repo = getdispatchrepo(repo, proto, command)
505
505
506 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
506 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
507 commandtable = commandsv2 if transportversion == 2 else commands
507 commandtable = commandsv2 if transportversion == 2 else commands
508 func, spec = commandtable[command]
508 func, spec = commandtable[command]
509
509
510 args = proto.getargs(spec)
510 args = proto.getargs(spec)
511 return func(repo, proto, *args)
511 return func(repo, proto, *args)
512
512
513 def options(cmd, keys, others):
513 def options(cmd, keys, others):
514 opts = {}
514 opts = {}
515 for k in keys:
515 for k in keys:
516 if k in others:
516 if k in others:
517 opts[k] = others[k]
517 opts[k] = others[k]
518 del others[k]
518 del others[k]
519 if others:
519 if others:
520 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
520 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
521 % (cmd, ",".join(others)))
521 % (cmd, ",".join(others)))
522 return opts
522 return opts
523
523
524 def bundle1allowed(repo, action):
524 def bundle1allowed(repo, action):
525 """Whether a bundle1 operation is allowed from the server.
525 """Whether a bundle1 operation is allowed from the server.
526
526
527 Priority is:
527 Priority is:
528
528
529 1. server.bundle1gd.<action> (if generaldelta active)
529 1. server.bundle1gd.<action> (if generaldelta active)
530 2. server.bundle1.<action>
530 2. server.bundle1.<action>
531 3. server.bundle1gd (if generaldelta active)
531 3. server.bundle1gd (if generaldelta active)
532 4. server.bundle1
532 4. server.bundle1
533 """
533 """
534 ui = repo.ui
534 ui = repo.ui
535 gd = 'generaldelta' in repo.requirements
535 gd = 'generaldelta' in repo.requirements
536
536
537 if gd:
537 if gd:
538 v = ui.configbool('server', 'bundle1gd.%s' % action)
538 v = ui.configbool('server', 'bundle1gd.%s' % action)
539 if v is not None:
539 if v is not None:
540 return v
540 return v
541
541
542 v = ui.configbool('server', 'bundle1.%s' % action)
542 v = ui.configbool('server', 'bundle1.%s' % action)
543 if v is not None:
543 if v is not None:
544 return v
544 return v
545
545
546 if gd:
546 if gd:
547 v = ui.configbool('server', 'bundle1gd')
547 v = ui.configbool('server', 'bundle1gd')
548 if v is not None:
548 if v is not None:
549 return v
549 return v
550
550
551 return ui.configbool('server', 'bundle1')
551 return ui.configbool('server', 'bundle1')
552
552
553 def supportedcompengines(ui, role):
553 def supportedcompengines(ui, role):
554 """Obtain the list of supported compression engines for a request."""
554 """Obtain the list of supported compression engines for a request."""
555 assert role in (util.CLIENTROLE, util.SERVERROLE)
555 assert role in (util.CLIENTROLE, util.SERVERROLE)
556
556
557 compengines = util.compengines.supportedwireengines(role)
557 compengines = util.compengines.supportedwireengines(role)
558
558
559 # Allow config to override default list and ordering.
559 # Allow config to override default list and ordering.
560 if role == util.SERVERROLE:
560 if role == util.SERVERROLE:
561 configengines = ui.configlist('server', 'compressionengines')
561 configengines = ui.configlist('server', 'compressionengines')
562 config = 'server.compressionengines'
562 config = 'server.compressionengines'
563 else:
563 else:
564 # This is currently implemented mainly to facilitate testing. In most
564 # This is currently implemented mainly to facilitate testing. In most
565 # cases, the server should be in charge of choosing a compression engine
565 # cases, the server should be in charge of choosing a compression engine
566 # because a server has the most to lose from a sub-optimal choice. (e.g.
566 # because a server has the most to lose from a sub-optimal choice. (e.g.
567 # CPU DoS due to an expensive engine or a network DoS due to poor
567 # CPU DoS due to an expensive engine or a network DoS due to poor
568 # compression ratio).
568 # compression ratio).
569 configengines = ui.configlist('experimental',
569 configengines = ui.configlist('experimental',
570 'clientcompressionengines')
570 'clientcompressionengines')
571 config = 'experimental.clientcompressionengines'
571 config = 'experimental.clientcompressionengines'
572
572
573 # No explicit config. Filter out the ones that aren't supposed to be
573 # No explicit config. Filter out the ones that aren't supposed to be
574 # advertised and return default ordering.
574 # advertised and return default ordering.
575 if not configengines:
575 if not configengines:
576 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
576 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
577 return [e for e in compengines
577 return [e for e in compengines
578 if getattr(e.wireprotosupport(), attr) > 0]
578 if getattr(e.wireprotosupport(), attr) > 0]
579
579
580 # If compression engines are listed in the config, assume there is a good
580 # If compression engines are listed in the config, assume there is a good
581 # reason for it (like server operators wanting to achieve specific
581 # reason for it (like server operators wanting to achieve specific
582 # performance characteristics). So fail fast if the config references
582 # performance characteristics). So fail fast if the config references
583 # unusable compression engines.
583 # unusable compression engines.
584 validnames = set(e.name() for e in compengines)
584 validnames = set(e.name() for e in compengines)
585 invalidnames = set(e for e in configengines if e not in validnames)
585 invalidnames = set(e for e in configengines if e not in validnames)
586 if invalidnames:
586 if invalidnames:
587 raise error.Abort(_('invalid compression engine defined in %s: %s') %
587 raise error.Abort(_('invalid compression engine defined in %s: %s') %
588 (config, ', '.join(sorted(invalidnames))))
588 (config, ', '.join(sorted(invalidnames))))
589
589
590 compengines = [e for e in compengines if e.name() in configengines]
590 compengines = [e for e in compengines if e.name() in configengines]
591 compengines = sorted(compengines,
591 compengines = sorted(compengines,
592 key=lambda e: configengines.index(e.name()))
592 key=lambda e: configengines.index(e.name()))
593
593
594 if not compengines:
594 if not compengines:
595 raise error.Abort(_('%s config option does not specify any known '
595 raise error.Abort(_('%s config option does not specify any known '
596 'compression engines') % config,
596 'compression engines') % config,
597 hint=_('usable compression engines: %s') %
597 hint=_('usable compression engines: %s') %
598 ', '.sorted(validnames))
598 ', '.sorted(validnames))
599
599
600 return compengines
600 return compengines
601
601
602 class commandentry(object):
602 class commandentry(object):
603 """Represents a declared wire protocol command."""
603 """Represents a declared wire protocol command."""
604 def __init__(self, func, args='', transports=None,
604 def __init__(self, func, args='', transports=None,
605 permission='push'):
605 permission='push'):
606 self.func = func
606 self.func = func
607 self.args = args
607 self.args = args
608 self.transports = transports or set()
608 self.transports = transports or set()
609 self.permission = permission
609 self.permission = permission
610
610
611 def _merge(self, func, args):
611 def _merge(self, func, args):
612 """Merge this instance with an incoming 2-tuple.
612 """Merge this instance with an incoming 2-tuple.
613
613
614 This is called when a caller using the old 2-tuple API attempts
614 This is called when a caller using the old 2-tuple API attempts
615 to replace an instance. The incoming values are merged with
615 to replace an instance. The incoming values are merged with
616 data not captured by the 2-tuple and a new instance containing
616 data not captured by the 2-tuple and a new instance containing
617 the union of the two objects is returned.
617 the union of the two objects is returned.
618 """
618 """
619 return commandentry(func, args=args, transports=set(self.transports),
619 return commandentry(func, args=args, transports=set(self.transports),
620 permission=self.permission)
620 permission=self.permission)
621
621
622 # Old code treats instances as 2-tuples. So expose that interface.
622 # Old code treats instances as 2-tuples. So expose that interface.
623 def __iter__(self):
623 def __iter__(self):
624 yield self.func
624 yield self.func
625 yield self.args
625 yield self.args
626
626
627 def __getitem__(self, i):
627 def __getitem__(self, i):
628 if i == 0:
628 if i == 0:
629 return self.func
629 return self.func
630 elif i == 1:
630 elif i == 1:
631 return self.args
631 return self.args
632 else:
632 else:
633 raise IndexError('can only access elements 0 and 1')
633 raise IndexError('can only access elements 0 and 1')
634
634
635 class commanddict(dict):
635 class commanddict(dict):
636 """Container for registered wire protocol commands.
636 """Container for registered wire protocol commands.
637
637
638 It behaves like a dict. But __setitem__ is overwritten to allow silent
638 It behaves like a dict. But __setitem__ is overwritten to allow silent
639 coercion of values from 2-tuples for API compatibility.
639 coercion of values from 2-tuples for API compatibility.
640 """
640 """
641 def __setitem__(self, k, v):
641 def __setitem__(self, k, v):
642 if isinstance(v, commandentry):
642 if isinstance(v, commandentry):
643 pass
643 pass
644 # Cast 2-tuples to commandentry instances.
644 # Cast 2-tuples to commandentry instances.
645 elif isinstance(v, tuple):
645 elif isinstance(v, tuple):
646 if len(v) != 2:
646 if len(v) != 2:
647 raise ValueError('command tuples must have exactly 2 elements')
647 raise ValueError('command tuples must have exactly 2 elements')
648
648
649 # It is common for extensions to wrap wire protocol commands via
649 # It is common for extensions to wrap wire protocol commands via
650 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
650 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
651 # doing this aren't aware of the new API that uses objects to store
651 # doing this aren't aware of the new API that uses objects to store
652 # command entries, we automatically merge old state with new.
652 # command entries, we automatically merge old state with new.
653 if k in self:
653 if k in self:
654 v = self[k]._merge(v[0], v[1])
654 v = self[k]._merge(v[0], v[1])
655 else:
655 else:
656 # Use default values from @wireprotocommand.
656 # Use default values from @wireprotocommand.
657 v = commandentry(v[0], args=v[1],
657 v = commandentry(v[0], args=v[1],
658 transports=set(wireprototypes.TRANSPORTS),
658 transports=set(wireprototypes.TRANSPORTS),
659 permission='push')
659 permission='push')
660 else:
660 else:
661 raise ValueError('command entries must be commandentry instances '
661 raise ValueError('command entries must be commandentry instances '
662 'or 2-tuples')
662 'or 2-tuples')
663
663
664 return super(commanddict, self).__setitem__(k, v)
664 return super(commanddict, self).__setitem__(k, v)
665
665
666 def commandavailable(self, command, proto):
666 def commandavailable(self, command, proto):
667 """Determine if a command is available for the requested protocol."""
667 """Determine if a command is available for the requested protocol."""
668 assert proto.name in wireprototypes.TRANSPORTS
668 assert proto.name in wireprototypes.TRANSPORTS
669
669
670 entry = self.get(command)
670 entry = self.get(command)
671
671
672 if not entry:
672 if not entry:
673 return False
673 return False
674
674
675 if proto.name not in entry.transports:
675 if proto.name not in entry.transports:
676 return False
676 return False
677
677
678 return True
678 return True
679
679
680 # Constants specifying which transports a wire protocol command should be
680 # Constants specifying which transports a wire protocol command should be
681 # available on. For use with @wireprotocommand.
681 # available on. For use with @wireprotocommand.
682 POLICY_ALL = 'all'
682 POLICY_ALL = 'all'
683 POLICY_V1_ONLY = 'v1-only'
683 POLICY_V1_ONLY = 'v1-only'
684 POLICY_V2_ONLY = 'v2-only'
684 POLICY_V2_ONLY = 'v2-only'
685
685
686 # For version 1 transports.
686 # For version 1 transports.
687 commands = commanddict()
687 commands = commanddict()
688
688
689 # For version 2 transports.
689 # For version 2 transports.
690 commandsv2 = commanddict()
690 commandsv2 = commanddict()
691
691
692 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL,
692 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL,
693 permission='push'):
693 permission='push'):
694 """Decorator to declare a wire protocol command.
694 """Decorator to declare a wire protocol command.
695
695
696 ``name`` is the name of the wire protocol command being provided.
696 ``name`` is the name of the wire protocol command being provided.
697
697
698 ``args`` is a space-delimited list of named arguments that the command
698 ``args`` is a space-delimited list of named arguments that the command
699 accepts. ``*`` is a special value that says to accept all arguments.
699 accepts. ``*`` is a special value that says to accept all arguments.
700
700
701 ``transportpolicy`` is a POLICY_* constant denoting which transports
701 ``transportpolicy`` is a POLICY_* constant denoting which transports
702 this wire protocol command should be exposed to. By default, commands
702 this wire protocol command should be exposed to. By default, commands
703 are exposed to all wire protocol transports.
703 are exposed to all wire protocol transports.
704
704
705 ``permission`` defines the permission type needed to run this command.
705 ``permission`` defines the permission type needed to run this command.
706 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
706 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
707 respectively. Default is to assume command requires ``push`` permissions
707 respectively. Default is to assume command requires ``push`` permissions
708 because otherwise commands not declaring their permissions could modify
708 because otherwise commands not declaring their permissions could modify
709 a repository that is supposed to be read-only.
709 a repository that is supposed to be read-only.
710 """
710 """
711 if transportpolicy == POLICY_ALL:
711 if transportpolicy == POLICY_ALL:
712 transports = set(wireprototypes.TRANSPORTS)
712 transports = set(wireprototypes.TRANSPORTS)
713 transportversions = {1, 2}
713 transportversions = {1, 2}
714 elif transportpolicy == POLICY_V1_ONLY:
714 elif transportpolicy == POLICY_V1_ONLY:
715 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
715 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
716 if v['version'] == 1}
716 if v['version'] == 1}
717 transportversions = {1}
717 transportversions = {1}
718 elif transportpolicy == POLICY_V2_ONLY:
718 elif transportpolicy == POLICY_V2_ONLY:
719 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
719 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
720 if v['version'] == 2}
720 if v['version'] == 2}
721 transportversions = {2}
721 transportversions = {2}
722 else:
722 else:
723 raise error.ProgrammingError('invalid transport policy value: %s' %
723 raise error.ProgrammingError('invalid transport policy value: %s' %
724 transportpolicy)
724 transportpolicy)
725
725
726 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
726 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
727 # SSHv2.
727 # SSHv2.
728 # TODO undo this hack when SSH is using the unified frame protocol.
728 # TODO undo this hack when SSH is using the unified frame protocol.
729 if name == b'batch':
729 if name == b'batch':
730 transports.add(wireprototypes.SSHV2)
730 transports.add(wireprototypes.SSHV2)
731
731
732 if permission not in ('push', 'pull'):
732 if permission not in ('push', 'pull'):
733 raise error.ProgrammingError('invalid wire protocol permission; '
733 raise error.ProgrammingError('invalid wire protocol permission; '
734 'got %s; expected "push" or "pull"' %
734 'got %s; expected "push" or "pull"' %
735 permission)
735 permission)
736
736
737 def register(func):
737 def register(func):
738 if 1 in transportversions:
738 if 1 in transportversions:
739 if name in commands:
739 if name in commands:
740 raise error.ProgrammingError('%s command already registered '
740 raise error.ProgrammingError('%s command already registered '
741 'for version 1' % name)
741 'for version 1' % name)
742 commands[name] = commandentry(func, args=args,
742 commands[name] = commandentry(func, args=args,
743 transports=transports,
743 transports=transports,
744 permission=permission)
744 permission=permission)
745 if 2 in transportversions:
745 if 2 in transportversions:
746 if name in commandsv2:
746 if name in commandsv2:
747 raise error.ProgrammingError('%s command already registered '
747 raise error.ProgrammingError('%s command already registered '
748 'for version 2' % name)
748 'for version 2' % name)
749 commandsv2[name] = commandentry(func, args=args,
749 commandsv2[name] = commandentry(func, args=args,
750 transports=transports,
750 transports=transports,
751 permission=permission)
751 permission=permission)
752
752
753 return func
753 return func
754 return register
754 return register
755
755
756 # TODO define a more appropriate permissions type to use for this.
756 # TODO define a more appropriate permissions type to use for this.
757 @wireprotocommand('batch', 'cmds *', permission='pull',
757 @wireprotocommand('batch', 'cmds *', permission='pull',
758 transportpolicy=POLICY_V1_ONLY)
758 transportpolicy=POLICY_V1_ONLY)
759 def batch(repo, proto, cmds, others):
759 def batch(repo, proto, cmds, others):
760 repo = repo.filtered("served")
760 repo = repo.filtered("served")
761 res = []
761 res = []
762 for pair in cmds.split(';'):
762 for pair in cmds.split(';'):
763 op, args = pair.split(' ', 1)
763 op, args = pair.split(' ', 1)
764 vals = {}
764 vals = {}
765 for a in args.split(','):
765 for a in args.split(','):
766 if a:
766 if a:
767 n, v = a.split('=')
767 n, v = a.split('=')
768 vals[unescapearg(n)] = unescapearg(v)
768 vals[unescapearg(n)] = unescapearg(v)
769 func, spec = commands[op]
769 func, spec = commands[op]
770
770
771 # Validate that client has permissions to perform this command.
771 # Validate that client has permissions to perform this command.
772 perm = commands[op].permission
772 perm = commands[op].permission
773 assert perm in ('push', 'pull')
773 assert perm in ('push', 'pull')
774 proto.checkperm(perm)
774 proto.checkperm(perm)
775
775
776 if spec:
776 if spec:
777 keys = spec.split()
777 keys = spec.split()
778 data = {}
778 data = {}
779 for k in keys:
779 for k in keys:
780 if k == '*':
780 if k == '*':
781 star = {}
781 star = {}
782 for key in vals.keys():
782 for key in vals.keys():
783 if key not in keys:
783 if key not in keys:
784 star[key] = vals[key]
784 star[key] = vals[key]
785 data['*'] = star
785 data['*'] = star
786 else:
786 else:
787 data[k] = vals[k]
787 data[k] = vals[k]
788 result = func(repo, proto, *[data[k] for k in keys])
788 result = func(repo, proto, *[data[k] for k in keys])
789 else:
789 else:
790 result = func(repo, proto)
790 result = func(repo, proto)
791 if isinstance(result, wireprototypes.ooberror):
791 if isinstance(result, wireprototypes.ooberror):
792 return result
792 return result
793
793
794 # For now, all batchable commands must return bytesresponse or
794 # For now, all batchable commands must return bytesresponse or
795 # raw bytes (for backwards compatibility).
795 # raw bytes (for backwards compatibility).
796 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
796 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
797 if isinstance(result, wireprototypes.bytesresponse):
797 if isinstance(result, wireprototypes.bytesresponse):
798 result = result.data
798 result = result.data
799 res.append(escapearg(result))
799 res.append(escapearg(result))
800
800
801 return wireprototypes.bytesresponse(';'.join(res))
801 return wireprototypes.bytesresponse(';'.join(res))
802
802
803 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
803 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
804 permission='pull')
804 permission='pull')
805 def between(repo, proto, pairs):
805 def between(repo, proto, pairs):
806 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
806 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
807 r = []
807 r = []
808 for b in repo.between(pairs):
808 for b in repo.between(pairs):
809 r.append(encodelist(b) + "\n")
809 r.append(encodelist(b) + "\n")
810
810
811 return wireprototypes.bytesresponse(''.join(r))
811 return wireprototypes.bytesresponse(''.join(r))
812
812
813 @wireprotocommand('branchmap', permission='pull')
813 @wireprotocommand('branchmap', permission='pull')
814 def branchmap(repo, proto):
814 def branchmap(repo, proto):
815 branchmap = repo.branchmap()
815 branchmap = repo.branchmap()
816 heads = []
816 heads = []
817 for branch, nodes in branchmap.iteritems():
817 for branch, nodes in branchmap.iteritems():
818 branchname = urlreq.quote(encoding.fromlocal(branch))
818 branchname = urlreq.quote(encoding.fromlocal(branch))
819 branchnodes = encodelist(nodes)
819 branchnodes = encodelist(nodes)
820 heads.append('%s %s' % (branchname, branchnodes))
820 heads.append('%s %s' % (branchname, branchnodes))
821
821
822 return wireprototypes.bytesresponse('\n'.join(heads))
822 return wireprototypes.bytesresponse('\n'.join(heads))
823
823
824 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
824 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
825 permission='pull')
825 permission='pull')
826 def branches(repo, proto, nodes):
826 def branches(repo, proto, nodes):
827 nodes = decodelist(nodes)
827 nodes = decodelist(nodes)
828 r = []
828 r = []
829 for b in repo.branches(nodes):
829 for b in repo.branches(nodes):
830 r.append(encodelist(b) + "\n")
830 r.append(encodelist(b) + "\n")
831
831
832 return wireprototypes.bytesresponse(''.join(r))
832 return wireprototypes.bytesresponse(''.join(r))
833
833
834 @wireprotocommand('clonebundles', '', permission='pull')
834 @wireprotocommand('clonebundles', '', permission='pull')
835 def clonebundles(repo, proto):
835 def clonebundles(repo, proto):
836 """Server command for returning info for available bundles to seed clones.
836 """Server command for returning info for available bundles to seed clones.
837
837
838 Clients will parse this response and determine what bundle to fetch.
838 Clients will parse this response and determine what bundle to fetch.
839
839
840 Extensions may wrap this command to filter or dynamically emit data
840 Extensions may wrap this command to filter or dynamically emit data
841 depending on the request. e.g. you could advertise URLs for the closest
841 depending on the request. e.g. you could advertise URLs for the closest
842 data center given the client's IP address.
842 data center given the client's IP address.
843 """
843 """
844 return wireprototypes.bytesresponse(
844 return wireprototypes.bytesresponse(
845 repo.vfs.tryread('clonebundles.manifest'))
845 repo.vfs.tryread('clonebundles.manifest'))
846
846
847 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
847 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
848 'known', 'getbundle', 'unbundlehash']
848 'known', 'getbundle', 'unbundlehash']
849
849
850 def _capabilities(repo, proto):
850 def _capabilities(repo, proto):
851 """return a list of capabilities for a repo
851 """return a list of capabilities for a repo
852
852
853 This function exists to allow extensions to easily wrap capabilities
853 This function exists to allow extensions to easily wrap capabilities
854 computation
854 computation
855
855
856 - returns a lists: easy to alter
856 - returns a lists: easy to alter
857 - change done here will be propagated to both `capabilities` and `hello`
857 - change done here will be propagated to both `capabilities` and `hello`
858 command without any other action needed.
858 command without any other action needed.
859 """
859 """
860 # copy to prevent modification of the global list
860 # copy to prevent modification of the global list
861 caps = list(wireprotocaps)
861 caps = list(wireprotocaps)
862
862
863 # Command of same name as capability isn't exposed to version 1 of
863 # Command of same name as capability isn't exposed to version 1 of
864 # transports. So conditionally add it.
864 # transports. So conditionally add it.
865 if commands.commandavailable('changegroupsubset', proto):
865 if commands.commandavailable('changegroupsubset', proto):
866 caps.append('changegroupsubset')
866 caps.append('changegroupsubset')
867
867
868 if streamclone.allowservergeneration(repo):
868 if streamclone.allowservergeneration(repo):
869 if repo.ui.configbool('server', 'preferuncompressed'):
869 if repo.ui.configbool('server', 'preferuncompressed'):
870 caps.append('stream-preferred')
870 caps.append('stream-preferred')
871 requiredformats = repo.requirements & repo.supportedformats
871 requiredformats = repo.requirements & repo.supportedformats
872 # if our local revlogs are just revlogv1, add 'stream' cap
872 # if our local revlogs are just revlogv1, add 'stream' cap
873 if not requiredformats - {'revlogv1'}:
873 if not requiredformats - {'revlogv1'}:
874 caps.append('stream')
874 caps.append('stream')
875 # otherwise, add 'streamreqs' detailing our local revlog format
875 # otherwise, add 'streamreqs' detailing our local revlog format
876 else:
876 else:
877 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
877 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
878 if repo.ui.configbool('experimental', 'bundle2-advertise'):
878 if repo.ui.configbool('experimental', 'bundle2-advertise'):
879 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
879 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
880 caps.append('bundle2=' + urlreq.quote(capsblob))
880 caps.append('bundle2=' + urlreq.quote(capsblob))
881 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
881 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
882
882
883 return proto.addcapabilities(repo, caps)
883 return proto.addcapabilities(repo, caps)
884
884
885 # If you are writing an extension and consider wrapping this function. Wrap
885 # If you are writing an extension and consider wrapping this function. Wrap
886 # `_capabilities` instead.
886 # `_capabilities` instead.
887 @wireprotocommand('capabilities', permission='pull')
887 @wireprotocommand('capabilities', permission='pull')
888 def capabilities(repo, proto):
888 def capabilities(repo, proto):
889 return wireprototypes.bytesresponse(' '.join(_capabilities(repo, proto)))
889 return wireprototypes.bytesresponse(' '.join(_capabilities(repo, proto)))
890
890
891 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
891 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
892 permission='pull')
892 permission='pull')
893 def changegroup(repo, proto, roots):
893 def changegroup(repo, proto, roots):
894 nodes = decodelist(roots)
894 nodes = decodelist(roots)
895 outgoing = discovery.outgoing(repo, missingroots=nodes,
895 outgoing = discovery.outgoing(repo, missingroots=nodes,
896 missingheads=repo.heads())
896 missingheads=repo.heads())
897 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
897 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
898 gen = iter(lambda: cg.read(32768), '')
898 gen = iter(lambda: cg.read(32768), '')
899 return wireprototypes.streamres(gen=gen)
899 return wireprototypes.streamres(gen=gen)
900
900
901 @wireprotocommand('changegroupsubset', 'bases heads',
901 @wireprotocommand('changegroupsubset', 'bases heads',
902 transportpolicy=POLICY_V1_ONLY,
902 transportpolicy=POLICY_V1_ONLY,
903 permission='pull')
903 permission='pull')
904 def changegroupsubset(repo, proto, bases, heads):
904 def changegroupsubset(repo, proto, bases, heads):
905 bases = decodelist(bases)
905 bases = decodelist(bases)
906 heads = decodelist(heads)
906 heads = decodelist(heads)
907 outgoing = discovery.outgoing(repo, missingroots=bases,
907 outgoing = discovery.outgoing(repo, missingroots=bases,
908 missingheads=heads)
908 missingheads=heads)
909 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
909 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
910 gen = iter(lambda: cg.read(32768), '')
910 gen = iter(lambda: cg.read(32768), '')
911 return wireprototypes.streamres(gen=gen)
911 return wireprototypes.streamres(gen=gen)
912
912
913 @wireprotocommand('debugwireargs', 'one two *',
913 @wireprotocommand('debugwireargs', 'one two *',
914 permission='pull')
914 permission='pull')
915 def debugwireargs(repo, proto, one, two, others):
915 def debugwireargs(repo, proto, one, two, others):
916 # only accept optional args from the known set
916 # only accept optional args from the known set
917 opts = options('debugwireargs', ['three', 'four'], others)
917 opts = options('debugwireargs', ['three', 'four'], others)
918 return wireprototypes.bytesresponse(repo.debugwireargs(
918 return wireprototypes.bytesresponse(repo.debugwireargs(
919 one, two, **pycompat.strkwargs(opts)))
919 one, two, **pycompat.strkwargs(opts)))
920
920
921 @wireprotocommand('getbundle', '*', permission='pull')
921 @wireprotocommand('getbundle', '*', permission='pull')
922 def getbundle(repo, proto, others):
922 def getbundle(repo, proto, others):
923 opts = options('getbundle', gboptsmap.keys(), others)
923 opts = options('getbundle', gboptsmap.keys(), others)
924 for k, v in opts.iteritems():
924 for k, v in opts.iteritems():
925 keytype = gboptsmap[k]
925 keytype = gboptsmap[k]
926 if keytype == 'nodes':
926 if keytype == 'nodes':
927 opts[k] = decodelist(v)
927 opts[k] = decodelist(v)
928 elif keytype == 'csv':
928 elif keytype == 'csv':
929 opts[k] = list(v.split(','))
929 opts[k] = list(v.split(','))
930 elif keytype == 'scsv':
930 elif keytype == 'scsv':
931 opts[k] = set(v.split(','))
931 opts[k] = set(v.split(','))
932 elif keytype == 'boolean':
932 elif keytype == 'boolean':
933 # Client should serialize False as '0', which is a non-empty string
933 # Client should serialize False as '0', which is a non-empty string
934 # so it evaluates as a True bool.
934 # so it evaluates as a True bool.
935 if v == '0':
935 if v == '0':
936 opts[k] = False
936 opts[k] = False
937 else:
937 else:
938 opts[k] = bool(v)
938 opts[k] = bool(v)
939 elif keytype != 'plain':
939 elif keytype != 'plain':
940 raise KeyError('unknown getbundle option type %s'
940 raise KeyError('unknown getbundle option type %s'
941 % keytype)
941 % keytype)
942
942
943 if not bundle1allowed(repo, 'pull'):
943 if not bundle1allowed(repo, 'pull'):
944 if not exchange.bundle2requested(opts.get('bundlecaps')):
944 if not exchange.bundle2requested(opts.get('bundlecaps')):
945 if proto.name == 'http-v1':
945 if proto.name == 'http-v1':
946 return wireprototypes.ooberror(bundle2required)
946 return wireprototypes.ooberror(bundle2required)
947 raise error.Abort(bundle2requiredmain,
947 raise error.Abort(bundle2requiredmain,
948 hint=bundle2requiredhint)
948 hint=bundle2requiredhint)
949
949
950 prefercompressed = True
950 prefercompressed = True
951
951
952 try:
952 try:
953 if repo.ui.configbool('server', 'disablefullbundle'):
953 if repo.ui.configbool('server', 'disablefullbundle'):
954 # Check to see if this is a full clone.
954 # Check to see if this is a full clone.
955 clheads = set(repo.changelog.heads())
955 clheads = set(repo.changelog.heads())
956 changegroup = opts.get('cg', True)
956 changegroup = opts.get('cg', True)
957 heads = set(opts.get('heads', set()))
957 heads = set(opts.get('heads', set()))
958 common = set(opts.get('common', set()))
958 common = set(opts.get('common', set()))
959 common.discard(nullid)
959 common.discard(nullid)
960 if changegroup and not common and clheads == heads:
960 if changegroup and not common and clheads == heads:
961 raise error.Abort(
961 raise error.Abort(
962 _('server has pull-based clones disabled'),
962 _('server has pull-based clones disabled'),
963 hint=_('remove --pull if specified or upgrade Mercurial'))
963 hint=_('remove --pull if specified or upgrade Mercurial'))
964
964
965 info, chunks = exchange.getbundlechunks(repo, 'serve',
965 info, chunks = exchange.getbundlechunks(repo, 'serve',
966 **pycompat.strkwargs(opts))
966 **pycompat.strkwargs(opts))
967 prefercompressed = info.get('prefercompressed', True)
967 prefercompressed = info.get('prefercompressed', True)
968 except error.Abort as exc:
968 except error.Abort as exc:
969 # cleanly forward Abort error to the client
969 # cleanly forward Abort error to the client
970 if not exchange.bundle2requested(opts.get('bundlecaps')):
970 if not exchange.bundle2requested(opts.get('bundlecaps')):
971 if proto.name == 'http-v1':
971 if proto.name == 'http-v1':
972 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
972 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
973 raise # cannot do better for bundle1 + ssh
973 raise # cannot do better for bundle1 + ssh
974 # bundle2 request expect a bundle2 reply
974 # bundle2 request expect a bundle2 reply
975 bundler = bundle2.bundle20(repo.ui)
975 bundler = bundle2.bundle20(repo.ui)
976 manargs = [('message', pycompat.bytestr(exc))]
976 manargs = [('message', pycompat.bytestr(exc))]
977 advargs = []
977 advargs = []
978 if exc.hint is not None:
978 if exc.hint is not None:
979 advargs.append(('hint', exc.hint))
979 advargs.append(('hint', exc.hint))
980 bundler.addpart(bundle2.bundlepart('error:abort',
980 bundler.addpart(bundle2.bundlepart('error:abort',
981 manargs, advargs))
981 manargs, advargs))
982 chunks = bundler.getchunks()
982 chunks = bundler.getchunks()
983 prefercompressed = False
983 prefercompressed = False
984
984
985 return wireprototypes.streamres(
985 return wireprototypes.streamres(
986 gen=chunks, prefer_uncompressed=not prefercompressed)
986 gen=chunks, prefer_uncompressed=not prefercompressed)
987
987
988 @wireprotocommand('heads', permission='pull')
988 @wireprotocommand('heads', permission='pull')
989 def heads(repo, proto):
989 def heads(repo, proto):
990 h = repo.heads()
990 h = repo.heads()
991 return wireprototypes.bytesresponse(encodelist(h) + '\n')
991 return wireprototypes.bytesresponse(encodelist(h) + '\n')
992
992
993 @wireprotocommand('hello', permission='pull')
993 @wireprotocommand('hello', permission='pull')
994 def hello(repo, proto):
994 def hello(repo, proto):
995 """Called as part of SSH handshake to obtain server info.
995 """Called as part of SSH handshake to obtain server info.
996
996
997 Returns a list of lines describing interesting things about the
997 Returns a list of lines describing interesting things about the
998 server, in an RFC822-like format.
998 server, in an RFC822-like format.
999
999
1000 Currently, the only one defined is ``capabilities``, which consists of a
1000 Currently, the only one defined is ``capabilities``, which consists of a
1001 line of space separated tokens describing server abilities:
1001 line of space separated tokens describing server abilities:
1002
1002
1003 capabilities: <token0> <token1> <token2>
1003 capabilities: <token0> <token1> <token2>
1004 """
1004 """
1005 caps = capabilities(repo, proto).data
1005 caps = capabilities(repo, proto).data
1006 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
1006 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
1007
1007
1008 @wireprotocommand('listkeys', 'namespace', permission='pull')
1008 @wireprotocommand('listkeys', 'namespace', permission='pull')
1009 def listkeys(repo, proto, namespace):
1009 def listkeys(repo, proto, namespace):
1010 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
1010 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
1011 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
1011 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
1012
1012
1013 @wireprotocommand('lookup', 'key', permission='pull')
1013 @wireprotocommand('lookup', 'key', permission='pull')
1014 def lookup(repo, proto, key):
1014 def lookup(repo, proto, key):
1015 try:
1015 try:
1016 k = encoding.tolocal(key)
1016 k = encoding.tolocal(key)
1017 c = repo[k]
1017 n = repo.lookup(k)
1018 r = c.hex()
1018 r = hex(n)
1019 success = 1
1019 success = 1
1020 except Exception as inst:
1020 except Exception as inst:
1021 r = stringutil.forcebytestr(inst)
1021 r = stringutil.forcebytestr(inst)
1022 success = 0
1022 success = 0
1023 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1023 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1024
1024
1025 @wireprotocommand('known', 'nodes *', permission='pull')
1025 @wireprotocommand('known', 'nodes *', permission='pull')
1026 def known(repo, proto, nodes, others):
1026 def known(repo, proto, nodes, others):
1027 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
1027 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
1028 return wireprototypes.bytesresponse(v)
1028 return wireprototypes.bytesresponse(v)
1029
1029
1030 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
1030 @wireprotocommand('pushkey', 'namespace key old new', permission='push')
1031 def pushkey(repo, proto, namespace, key, old, new):
1031 def pushkey(repo, proto, namespace, key, old, new):
1032 # compatibility with pre-1.8 clients which were accidentally
1032 # compatibility with pre-1.8 clients which were accidentally
1033 # sending raw binary nodes rather than utf-8-encoded hex
1033 # sending raw binary nodes rather than utf-8-encoded hex
1034 if len(new) == 20 and stringutil.escapestr(new) != new:
1034 if len(new) == 20 and stringutil.escapestr(new) != new:
1035 # looks like it could be a binary node
1035 # looks like it could be a binary node
1036 try:
1036 try:
1037 new.decode('utf-8')
1037 new.decode('utf-8')
1038 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1038 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1039 except UnicodeDecodeError:
1039 except UnicodeDecodeError:
1040 pass # binary, leave unmodified
1040 pass # binary, leave unmodified
1041 else:
1041 else:
1042 new = encoding.tolocal(new) # normal path
1042 new = encoding.tolocal(new) # normal path
1043
1043
1044 with proto.mayberedirectstdio() as output:
1044 with proto.mayberedirectstdio() as output:
1045 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1045 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1046 encoding.tolocal(old), new) or False
1046 encoding.tolocal(old), new) or False
1047
1047
1048 output = output.getvalue() if output else ''
1048 output = output.getvalue() if output else ''
1049 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1049 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1050
1050
1051 @wireprotocommand('stream_out', permission='pull')
1051 @wireprotocommand('stream_out', permission='pull')
1052 def stream(repo, proto):
1052 def stream(repo, proto):
1053 '''If the server supports streaming clone, it advertises the "stream"
1053 '''If the server supports streaming clone, it advertises the "stream"
1054 capability with a value representing the version and flags of the repo
1054 capability with a value representing the version and flags of the repo
1055 it is serving. Client checks to see if it understands the format.
1055 it is serving. Client checks to see if it understands the format.
1056 '''
1056 '''
1057 return wireprototypes.streamreslegacy(
1057 return wireprototypes.streamreslegacy(
1058 streamclone.generatev1wireproto(repo))
1058 streamclone.generatev1wireproto(repo))
1059
1059
1060 @wireprotocommand('unbundle', 'heads', permission='push')
1060 @wireprotocommand('unbundle', 'heads', permission='push')
1061 def unbundle(repo, proto, heads):
1061 def unbundle(repo, proto, heads):
1062 their_heads = decodelist(heads)
1062 their_heads = decodelist(heads)
1063
1063
1064 with proto.mayberedirectstdio() as output:
1064 with proto.mayberedirectstdio() as output:
1065 try:
1065 try:
1066 exchange.check_heads(repo, their_heads, 'preparing changes')
1066 exchange.check_heads(repo, their_heads, 'preparing changes')
1067
1067
1068 # write bundle data to temporary file because it can be big
1068 # write bundle data to temporary file because it can be big
1069 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1069 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1070 fp = os.fdopen(fd, r'wb+')
1070 fp = os.fdopen(fd, r'wb+')
1071 r = 0
1071 r = 0
1072 try:
1072 try:
1073 proto.forwardpayload(fp)
1073 proto.forwardpayload(fp)
1074 fp.seek(0)
1074 fp.seek(0)
1075 gen = exchange.readbundle(repo.ui, fp, None)
1075 gen = exchange.readbundle(repo.ui, fp, None)
1076 if (isinstance(gen, changegroupmod.cg1unpacker)
1076 if (isinstance(gen, changegroupmod.cg1unpacker)
1077 and not bundle1allowed(repo, 'push')):
1077 and not bundle1allowed(repo, 'push')):
1078 if proto.name == 'http-v1':
1078 if proto.name == 'http-v1':
1079 # need to special case http because stderr do not get to
1079 # need to special case http because stderr do not get to
1080 # the http client on failed push so we need to abuse
1080 # the http client on failed push so we need to abuse
1081 # some other error type to make sure the message get to
1081 # some other error type to make sure the message get to
1082 # the user.
1082 # the user.
1083 return wireprototypes.ooberror(bundle2required)
1083 return wireprototypes.ooberror(bundle2required)
1084 raise error.Abort(bundle2requiredmain,
1084 raise error.Abort(bundle2requiredmain,
1085 hint=bundle2requiredhint)
1085 hint=bundle2requiredhint)
1086
1086
1087 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1087 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1088 proto.client())
1088 proto.client())
1089 if util.safehasattr(r, 'addpart'):
1089 if util.safehasattr(r, 'addpart'):
1090 # The return looks streamable, we are in the bundle2 case
1090 # The return looks streamable, we are in the bundle2 case
1091 # and should return a stream.
1091 # and should return a stream.
1092 return wireprototypes.streamreslegacy(gen=r.getchunks())
1092 return wireprototypes.streamreslegacy(gen=r.getchunks())
1093 return wireprototypes.pushres(
1093 return wireprototypes.pushres(
1094 r, output.getvalue() if output else '')
1094 r, output.getvalue() if output else '')
1095
1095
1096 finally:
1096 finally:
1097 fp.close()
1097 fp.close()
1098 os.unlink(tempname)
1098 os.unlink(tempname)
1099
1099
1100 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1100 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1101 # handle non-bundle2 case first
1101 # handle non-bundle2 case first
1102 if not getattr(exc, 'duringunbundle2', False):
1102 if not getattr(exc, 'duringunbundle2', False):
1103 try:
1103 try:
1104 raise
1104 raise
1105 except error.Abort:
1105 except error.Abort:
1106 # The old code we moved used procutil.stderr directly.
1106 # The old code we moved used procutil.stderr directly.
1107 # We did not change it to minimise code change.
1107 # We did not change it to minimise code change.
1108 # This need to be moved to something proper.
1108 # This need to be moved to something proper.
1109 # Feel free to do it.
1109 # Feel free to do it.
1110 procutil.stderr.write("abort: %s\n" % exc)
1110 procutil.stderr.write("abort: %s\n" % exc)
1111 if exc.hint is not None:
1111 if exc.hint is not None:
1112 procutil.stderr.write("(%s)\n" % exc.hint)
1112 procutil.stderr.write("(%s)\n" % exc.hint)
1113 procutil.stderr.flush()
1113 procutil.stderr.flush()
1114 return wireprototypes.pushres(
1114 return wireprototypes.pushres(
1115 0, output.getvalue() if output else '')
1115 0, output.getvalue() if output else '')
1116 except error.PushRaced:
1116 except error.PushRaced:
1117 return wireprototypes.pusherr(
1117 return wireprototypes.pusherr(
1118 pycompat.bytestr(exc),
1118 pycompat.bytestr(exc),
1119 output.getvalue() if output else '')
1119 output.getvalue() if output else '')
1120
1120
1121 bundler = bundle2.bundle20(repo.ui)
1121 bundler = bundle2.bundle20(repo.ui)
1122 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1122 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1123 bundler.addpart(out)
1123 bundler.addpart(out)
1124 try:
1124 try:
1125 try:
1125 try:
1126 raise
1126 raise
1127 except error.PushkeyFailed as exc:
1127 except error.PushkeyFailed as exc:
1128 # check client caps
1128 # check client caps
1129 remotecaps = getattr(exc, '_replycaps', None)
1129 remotecaps = getattr(exc, '_replycaps', None)
1130 if (remotecaps is not None
1130 if (remotecaps is not None
1131 and 'pushkey' not in remotecaps.get('error', ())):
1131 and 'pushkey' not in remotecaps.get('error', ())):
1132 # no support remote side, fallback to Abort handler.
1132 # no support remote side, fallback to Abort handler.
1133 raise
1133 raise
1134 part = bundler.newpart('error:pushkey')
1134 part = bundler.newpart('error:pushkey')
1135 part.addparam('in-reply-to', exc.partid)
1135 part.addparam('in-reply-to', exc.partid)
1136 if exc.namespace is not None:
1136 if exc.namespace is not None:
1137 part.addparam('namespace', exc.namespace,
1137 part.addparam('namespace', exc.namespace,
1138 mandatory=False)
1138 mandatory=False)
1139 if exc.key is not None:
1139 if exc.key is not None:
1140 part.addparam('key', exc.key, mandatory=False)
1140 part.addparam('key', exc.key, mandatory=False)
1141 if exc.new is not None:
1141 if exc.new is not None:
1142 part.addparam('new', exc.new, mandatory=False)
1142 part.addparam('new', exc.new, mandatory=False)
1143 if exc.old is not None:
1143 if exc.old is not None:
1144 part.addparam('old', exc.old, mandatory=False)
1144 part.addparam('old', exc.old, mandatory=False)
1145 if exc.ret is not None:
1145 if exc.ret is not None:
1146 part.addparam('ret', exc.ret, mandatory=False)
1146 part.addparam('ret', exc.ret, mandatory=False)
1147 except error.BundleValueError as exc:
1147 except error.BundleValueError as exc:
1148 errpart = bundler.newpart('error:unsupportedcontent')
1148 errpart = bundler.newpart('error:unsupportedcontent')
1149 if exc.parttype is not None:
1149 if exc.parttype is not None:
1150 errpart.addparam('parttype', exc.parttype)
1150 errpart.addparam('parttype', exc.parttype)
1151 if exc.params:
1151 if exc.params:
1152 errpart.addparam('params', '\0'.join(exc.params))
1152 errpart.addparam('params', '\0'.join(exc.params))
1153 except error.Abort as exc:
1153 except error.Abort as exc:
1154 manargs = [('message', stringutil.forcebytestr(exc))]
1154 manargs = [('message', stringutil.forcebytestr(exc))]
1155 advargs = []
1155 advargs = []
1156 if exc.hint is not None:
1156 if exc.hint is not None:
1157 advargs.append(('hint', exc.hint))
1157 advargs.append(('hint', exc.hint))
1158 bundler.addpart(bundle2.bundlepart('error:abort',
1158 bundler.addpart(bundle2.bundlepart('error:abort',
1159 manargs, advargs))
1159 manargs, advargs))
1160 except error.PushRaced as exc:
1160 except error.PushRaced as exc:
1161 bundler.newpart('error:pushraced',
1161 bundler.newpart('error:pushraced',
1162 [('message', stringutil.forcebytestr(exc))])
1162 [('message', stringutil.forcebytestr(exc))])
1163 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
1163 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now