##// END OF EJS Templates
wireproto: move version 1 peer functionality to standalone module (API)...
Gregory Szorc -
r37632:a81d02ea default
parent child Browse files
Show More
@@ -128,6 +128,7 b' from mercurial import ('
128 util,
128 util,
129 wireproto,
129 wireproto,
130 wireprototypes,
130 wireprototypes,
131 wireprotov1peer,
131 )
132 )
132
133
133 from . import (
134 from . import (
@@ -319,7 +320,7 b' def clientextsetup(ui):'
319
320
320 extensions.wrapfunction(discovery, 'checkheads', _checkheads)
321 extensions.wrapfunction(discovery, 'checkheads', _checkheads)
321
322
322 wireproto.wirepeer.listkeyspatterns = listkeyspatterns
323 wireprotov1peer.wirepeer.listkeyspatterns = listkeyspatterns
323
324
324 partorder = exchange.b2partsgenorder
325 partorder = exchange.b2partsgenorder
325 index = partorder.index('changeset')
326 index = partorder.index('changeset')
@@ -13,8 +13,8 b' from mercurial import ('
13 error,
13 error,
14 httppeer,
14 httppeer,
15 util,
15 util,
16 wireproto,
17 wireprototypes,
16 wireprototypes,
17 wireprotov1peer,
18 )
18 )
19
19
20 from . import (
20 from . import (
@@ -145,9 +145,9 b' def wirereposetup(ui, repo):'
145 self._abort(error.ResponseError(_("unexpected response:"),
145 self._abort(error.ResponseError(_("unexpected response:"),
146 chunk))
146 chunk))
147
147
148 @wireproto.batchable
148 @wireprotov1peer.batchable
149 def statlfile(self, sha):
149 def statlfile(self, sha):
150 f = wireproto.future()
150 f = wireprotov1peer.future()
151 result = {'sha': sha}
151 result = {'sha': sha}
152 yield result, f
152 yield result, f
153 try:
153 try:
@@ -34,6 +34,7 b' from . import ('
34 wireproto,
34 wireproto,
35 wireprotoframing,
35 wireprotoframing,
36 wireprototypes,
36 wireprototypes,
37 wireprotov1peer,
37 wireprotov2server,
38 wireprotov2server,
38 )
39 )
39
40
@@ -382,7 +383,7 b' def parsev1commandresponse(ui, baseurl, '
382
383
383 return respurl, proto, resp
384 return respurl, proto, resp
384
385
385 class httppeer(wireproto.wirepeer):
386 class httppeer(wireprotov1peer.wirepeer):
386 def __init__(self, ui, path, url, opener, requestbuilder, caps):
387 def __init__(self, ui, path, url, opener, requestbuilder, caps):
387 self.ui = ui
388 self.ui = ui
388 self._path = path
389 self._path = path
@@ -18,6 +18,7 b' from . import ('
18 wireproto,
18 wireproto,
19 wireprotoserver,
19 wireprotoserver,
20 wireprototypes,
20 wireprototypes,
21 wireprotov1peer,
21 )
22 )
22 from .utils import (
23 from .utils import (
23 procutil,
24 procutil,
@@ -352,7 +353,7 b' def _performhandshake(ui, stdin, stdout,'
352
353
353 return protoname, caps
354 return protoname, caps
354
355
355 class sshv1peer(wireproto.wirepeer):
356 class sshv1peer(wireprotov1peer.wirepeer):
356 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps,
357 def __init__(self, ui, url, proc, stdin, stdout, stderr, caps,
357 autoreadstderr=True):
358 autoreadstderr=True):
358 """Create a peer from an existing SSH connection.
359 """Create a peer from an existing SSH connection.
@@ -589,7 +590,7 b' def makepeer(ui, path, proc, stdin, stdo'
589 def instance(ui, path, create):
590 def instance(ui, path, create):
590 """Create an SSH peer.
591 """Create an SSH peer.
591
592
592 The returned object conforms to the ``wireproto.wirepeer`` interface.
593 The returned object conforms to the ``wireprotov1peer.wirepeer`` interface.
593 """
594 """
594 u = util.url(path, parsequery=False, parsefragment=False)
595 u = util.url(path, parsequery=False, parsefragment=False)
595 if u.scheme != 'ssh' or not u.host or u.path is None:
596 if u.scheme != 'ssh' or not u.host or u.path is None:
@@ -7,13 +7,11 b''
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
11 import os
10 import os
12 import tempfile
11 import tempfile
13
12
14 from .i18n import _
13 from .i18n import _
15 from .node import (
14 from .node import (
16 bin,
17 hex,
15 hex,
18 nullid,
16 nullid,
19 )
17 )
@@ -25,10 +23,8 b' from . import ('
25 encoding,
23 encoding,
26 error,
24 error,
27 exchange,
25 exchange,
28 peer,
29 pushkey as pushkeymod,
26 pushkey as pushkeymod,
30 pycompat,
27 pycompat,
31 repository,
32 streamclone,
28 streamclone,
33 util,
29 util,
34 wireprototypes,
30 wireprototypes,
@@ -47,92 +43,6 b" bundle2requiredhint = _('see https://www"
47 'IncompatibleClient')
43 'IncompatibleClient')
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
44 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49
45
50 class remoteiterbatcher(peer.iterbatcher):
51 def __init__(self, remote):
52 super(remoteiterbatcher, self).__init__()
53 self._remote = remote
54
55 def __getattr__(self, name):
56 # Validate this method is batchable, since submit() only supports
57 # batchable methods.
58 fn = getattr(self._remote, name)
59 if not getattr(fn, 'batchable', None):
60 raise error.ProgrammingError('Attempted to batch a non-batchable '
61 'call to %r' % name)
62
63 return super(remoteiterbatcher, self).__getattr__(name)
64
65 def submit(self):
66 """Break the batch request into many patch calls and pipeline them.
67
68 This is mostly valuable over http where request sizes can be
69 limited, but can be used in other places as well.
70 """
71 # 2-tuple of (command, arguments) that represents what will be
72 # sent over the wire.
73 requests = []
74
75 # 4-tuple of (command, final future, @batchable generator, remote
76 # future).
77 results = []
78
79 for command, args, opts, finalfuture in self.calls:
80 mtd = getattr(self._remote, command)
81 batchable = mtd.batchable(mtd.__self__, *args, **opts)
82
83 commandargs, fremote = next(batchable)
84 assert fremote
85 requests.append((command, commandargs))
86 results.append((command, finalfuture, batchable, fremote))
87
88 if requests:
89 self._resultiter = self._remote._submitbatch(requests)
90
91 self._results = results
92
93 def results(self):
94 for command, finalfuture, batchable, remotefuture in self._results:
95 # Get the raw result, set it in the remote future, feed it
96 # back into the @batchable generator so it can be decoded, and
97 # set the result on the final future to this value.
98 remoteresult = next(self._resultiter)
99 remotefuture.set(remoteresult)
100 finalfuture.set(next(batchable))
101
102 # Verify our @batchable generators only emit 2 values.
103 try:
104 next(batchable)
105 except StopIteration:
106 pass
107 else:
108 raise error.ProgrammingError('%s @batchable generator emitted '
109 'unexpected value count' % command)
110
111 yield finalfuture.value
112
113 # Forward a couple of names from peer to make wireproto interactions
114 # slightly more sensible.
115 batchable = peer.batchable
116 future = peer.future
117
118
119 def encodebatchcmds(req):
120 """Return a ``cmds`` argument value for the ``batch`` command."""
121 escapearg = wireprototypes.escapebatcharg
122
123 cmds = []
124 for op, argsdict in req:
125 # Old servers didn't properly unescape argument names. So prevent
126 # the sending of argument names that may not be decoded properly by
127 # servers.
128 assert all(escapearg(k) == k for k in argsdict)
129
130 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
131 for k, v in argsdict.iteritems())
132 cmds.append('%s %s' % (op, args))
133
134 return ';'.join(cmds)
135
136 def clientcompressionsupport(proto):
46 def clientcompressionsupport(proto):
137 """Returns a list of compression methods supported by the client.
47 """Returns a list of compression methods supported by the client.
138
48
@@ -145,315 +55,6 b' def clientcompressionsupport(proto):'
145 return cap[5:].split(',')
55 return cap[5:].split(',')
146 return ['zlib', 'none']
56 return ['zlib', 'none']
147
57
148 # client side
149
150 class wirepeer(repository.legacypeer):
151 """Client-side interface for communicating with a peer repository.
152
153 Methods commonly call wire protocol commands of the same name.
154
155 See also httppeer.py and sshpeer.py for protocol-specific
156 implementations of this interface.
157 """
158 # Begin of ipeercommands interface.
159
160 def iterbatch(self):
161 return remoteiterbatcher(self)
162
163 @batchable
164 def lookup(self, key):
165 self.requirecap('lookup', _('look up remote revision'))
166 f = future()
167 yield {'key': encoding.fromlocal(key)}, f
168 d = f.value
169 success, data = d[:-1].split(" ", 1)
170 if int(success):
171 yield bin(data)
172 else:
173 self._abort(error.RepoError(data))
174
175 @batchable
176 def heads(self):
177 f = future()
178 yield {}, f
179 d = f.value
180 try:
181 yield wireprototypes.decodelist(d[:-1])
182 except ValueError:
183 self._abort(error.ResponseError(_("unexpected response:"), d))
184
185 @batchable
186 def known(self, nodes):
187 f = future()
188 yield {'nodes': wireprototypes.encodelist(nodes)}, f
189 d = f.value
190 try:
191 yield [bool(int(b)) for b in d]
192 except ValueError:
193 self._abort(error.ResponseError(_("unexpected response:"), d))
194
195 @batchable
196 def branchmap(self):
197 f = future()
198 yield {}, f
199 d = f.value
200 try:
201 branchmap = {}
202 for branchpart in d.splitlines():
203 branchname, branchheads = branchpart.split(' ', 1)
204 branchname = encoding.tolocal(urlreq.unquote(branchname))
205 branchheads = wireprototypes.decodelist(branchheads)
206 branchmap[branchname] = branchheads
207 yield branchmap
208 except TypeError:
209 self._abort(error.ResponseError(_("unexpected response:"), d))
210
211 @batchable
212 def listkeys(self, namespace):
213 if not self.capable('pushkey'):
214 yield {}, None
215 f = future()
216 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
217 yield {'namespace': encoding.fromlocal(namespace)}, f
218 d = f.value
219 self.ui.debug('received listkey for "%s": %i bytes\n'
220 % (namespace, len(d)))
221 yield pushkeymod.decodekeys(d)
222
223 @batchable
224 def pushkey(self, namespace, key, old, new):
225 if not self.capable('pushkey'):
226 yield False, None
227 f = future()
228 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
229 yield {'namespace': encoding.fromlocal(namespace),
230 'key': encoding.fromlocal(key),
231 'old': encoding.fromlocal(old),
232 'new': encoding.fromlocal(new)}, f
233 d = f.value
234 d, output = d.split('\n', 1)
235 try:
236 d = bool(int(d))
237 except ValueError:
238 raise error.ResponseError(
239 _('push failed (unexpected response):'), d)
240 for l in output.splitlines(True):
241 self.ui.status(_('remote: '), l)
242 yield d
243
244 def stream_out(self):
245 return self._callstream('stream_out')
246
247 def getbundle(self, source, **kwargs):
248 kwargs = pycompat.byteskwargs(kwargs)
249 self.requirecap('getbundle', _('look up remote changes'))
250 opts = {}
251 bundlecaps = kwargs.get('bundlecaps') or set()
252 for key, value in kwargs.iteritems():
253 if value is None:
254 continue
255 keytype = wireprototypes.GETBUNDLE_ARGUMENTS.get(key)
256 if keytype is None:
257 raise error.ProgrammingError(
258 'Unexpectedly None keytype for key %s' % key)
259 elif keytype == 'nodes':
260 value = wireprototypes.encodelist(value)
261 elif keytype == 'csv':
262 value = ','.join(value)
263 elif keytype == 'scsv':
264 value = ','.join(sorted(value))
265 elif keytype == 'boolean':
266 value = '%i' % bool(value)
267 elif keytype != 'plain':
268 raise KeyError('unknown getbundle option type %s'
269 % keytype)
270 opts[key] = value
271 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
272 if any((cap.startswith('HG2') for cap in bundlecaps)):
273 return bundle2.getunbundler(self.ui, f)
274 else:
275 return changegroupmod.cg1unpacker(f, 'UN')
276
277 def unbundle(self, cg, heads, url):
278 '''Send cg (a readable file-like object representing the
279 changegroup to push, typically a chunkbuffer object) to the
280 remote server as a bundle.
281
282 When pushing a bundle10 stream, return an integer indicating the
283 result of the push (see changegroup.apply()).
284
285 When pushing a bundle20 stream, return a bundle20 stream.
286
287 `url` is the url the client thinks it's pushing to, which is
288 visible to hooks.
289 '''
290
291 if heads != ['force'] and self.capable('unbundlehash'):
292 heads = wireprototypes.encodelist(
293 ['hashed', hashlib.sha1(''.join(sorted(heads))).digest()])
294 else:
295 heads = wireprototypes.encodelist(heads)
296
297 if util.safehasattr(cg, 'deltaheader'):
298 # this a bundle10, do the old style call sequence
299 ret, output = self._callpush("unbundle", cg, heads=heads)
300 if ret == "":
301 raise error.ResponseError(
302 _('push failed:'), output)
303 try:
304 ret = int(ret)
305 except ValueError:
306 raise error.ResponseError(
307 _('push failed (unexpected response):'), ret)
308
309 for l in output.splitlines(True):
310 self.ui.status(_('remote: '), l)
311 else:
312 # bundle2 push. Send a stream, fetch a stream.
313 stream = self._calltwowaystream('unbundle', cg, heads=heads)
314 ret = bundle2.getunbundler(self.ui, stream)
315 return ret
316
317 # End of ipeercommands interface.
318
319 # Begin of ipeerlegacycommands interface.
320
321 def branches(self, nodes):
322 n = wireprototypes.encodelist(nodes)
323 d = self._call("branches", nodes=n)
324 try:
325 br = [tuple(wireprototypes.decodelist(b)) for b in d.splitlines()]
326 return br
327 except ValueError:
328 self._abort(error.ResponseError(_("unexpected response:"), d))
329
330 def between(self, pairs):
331 batch = 8 # avoid giant requests
332 r = []
333 for i in xrange(0, len(pairs), batch):
334 n = " ".join([wireprototypes.encodelist(p, '-')
335 for p in pairs[i:i + batch]])
336 d = self._call("between", pairs=n)
337 try:
338 r.extend(l and wireprototypes.decodelist(l) or []
339 for l in d.splitlines())
340 except ValueError:
341 self._abort(error.ResponseError(_("unexpected response:"), d))
342 return r
343
344 def changegroup(self, nodes, kind):
345 n = wireprototypes.encodelist(nodes)
346 f = self._callcompressable("changegroup", roots=n)
347 return changegroupmod.cg1unpacker(f, 'UN')
348
349 def changegroupsubset(self, bases, heads, kind):
350 self.requirecap('changegroupsubset', _('look up remote changes'))
351 bases = wireprototypes.encodelist(bases)
352 heads = wireprototypes.encodelist(heads)
353 f = self._callcompressable("changegroupsubset",
354 bases=bases, heads=heads)
355 return changegroupmod.cg1unpacker(f, 'UN')
356
357 # End of ipeerlegacycommands interface.
358
359 def _submitbatch(self, req):
360 """run batch request <req> on the server
361
362 Returns an iterator of the raw responses from the server.
363 """
364 ui = self.ui
365 if ui.debugflag and ui.configbool('devel', 'debug.peer-request'):
366 ui.debug('devel-peer-request: batched-content\n')
367 for op, args in req:
368 msg = 'devel-peer-request: - %s (%d arguments)\n'
369 ui.debug(msg % (op, len(args)))
370
371 unescapearg = wireprototypes.unescapebatcharg
372
373 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
374 chunk = rsp.read(1024)
375 work = [chunk]
376 while chunk:
377 while ';' not in chunk and chunk:
378 chunk = rsp.read(1024)
379 work.append(chunk)
380 merged = ''.join(work)
381 while ';' in merged:
382 one, merged = merged.split(';', 1)
383 yield unescapearg(one)
384 chunk = rsp.read(1024)
385 work = [merged, chunk]
386 yield unescapearg(''.join(work))
387
388 def _submitone(self, op, args):
389 return self._call(op, **pycompat.strkwargs(args))
390
391 def debugwireargs(self, one, two, three=None, four=None, five=None):
392 # don't pass optional arguments left at their default value
393 opts = {}
394 if three is not None:
395 opts[r'three'] = three
396 if four is not None:
397 opts[r'four'] = four
398 return self._call('debugwireargs', one=one, two=two, **opts)
399
400 def _call(self, cmd, **args):
401 """execute <cmd> on the server
402
403 The command is expected to return a simple string.
404
405 returns the server reply as a string."""
406 raise NotImplementedError()
407
408 def _callstream(self, cmd, **args):
409 """execute <cmd> on the server
410
411 The command is expected to return a stream. Note that if the
412 command doesn't return a stream, _callstream behaves
413 differently for ssh and http peers.
414
415 returns the server reply as a file like object.
416 """
417 raise NotImplementedError()
418
419 def _callcompressable(self, cmd, **args):
420 """execute <cmd> on the server
421
422 The command is expected to return a stream.
423
424 The stream may have been compressed in some implementations. This
425 function takes care of the decompression. This is the only difference
426 with _callstream.
427
428 returns the server reply as a file like object.
429 """
430 raise NotImplementedError()
431
432 def _callpush(self, cmd, fp, **args):
433 """execute a <cmd> on server
434
435 The command is expected to be related to a push. Push has a special
436 return method.
437
438 returns the server reply as a (ret, output) tuple. ret is either
439 empty (error) or a stringified int.
440 """
441 raise NotImplementedError()
442
443 def _calltwowaystream(self, cmd, fp, **args):
444 """execute <cmd> on server
445
446 The command will send a stream to the server and get a stream in reply.
447 """
448 raise NotImplementedError()
449
450 def _abort(self, exception):
451 """clearly abort the wire protocol connection and raise the exception
452 """
453 raise NotImplementedError()
454
455 # server side
456
457 # wire protocol command can either return a string or one of these classes.
58 # wire protocol command can either return a string or one of these classes.
458
59
459 def getdispatchrepo(repo, proto, command):
60 def getdispatchrepo(repo, proto, command):
This diff has been collapsed as it changes many lines, (847 lines changed) Show them Hide them
@@ -1,4 +1,4 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireprotov1peer.py - Client-side functionality for wire protocol version 1.
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
@@ -8,45 +8,27 b''
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
12 import tempfile
13
11
14 from .i18n import _
12 from .i18n import _
15 from .node import (
13 from .node import (
16 bin,
14 bin,
17 hex,
18 nullid,
19 )
15 )
20
16
21 from . import (
17 from . import (
22 bundle2,
18 bundle2,
23 changegroup as changegroupmod,
19 changegroup as changegroupmod,
24 discovery,
25 encoding,
20 encoding,
26 error,
21 error,
27 exchange,
28 peer,
22 peer,
29 pushkey as pushkeymod,
23 pushkey as pushkeymod,
30 pycompat,
24 pycompat,
31 repository,
25 repository,
32 streamclone,
33 util,
26 util,
34 wireprototypes,
27 wireprototypes,
35 )
28 )
36
29
37 from .utils import (
38 procutil,
39 stringutil,
40 )
41
42 urlerr = util.urlerr
43 urlreq = util.urlreq
30 urlreq = util.urlreq
44
31
45 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
46 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
47 'IncompatibleClient')
48 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49
50 class remoteiterbatcher(peer.iterbatcher):
32 class remoteiterbatcher(peer.iterbatcher):
51 def __init__(self, remote):
33 def __init__(self, remote):
52 super(remoteiterbatcher, self).__init__()
34 super(remoteiterbatcher, self).__init__()
@@ -115,7 +97,6 b' class remoteiterbatcher(peer.iterbatcher'
115 batchable = peer.batchable
97 batchable = peer.batchable
116 future = peer.future
98 future = peer.future
117
99
118
119 def encodebatchcmds(req):
100 def encodebatchcmds(req):
120 """Return a ``cmds`` argument value for the ``batch`` command."""
101 """Return a ``cmds`` argument value for the ``batch`` command."""
121 escapearg = wireprototypes.escapebatcharg
102 escapearg = wireprototypes.escapebatcharg
@@ -133,20 +114,6 b' def encodebatchcmds(req):'
133
114
134 return ';'.join(cmds)
115 return ';'.join(cmds)
135
116
136 def clientcompressionsupport(proto):
137 """Returns a list of compression methods supported by the client.
138
139 Returns a list of the compression methods supported by the client
140 according to the protocol capabilities. If no such capability has
141 been announced, fallback to the default of zlib and uncompressed.
142 """
143 for cap in proto.getprotocaps():
144 if cap.startswith('comp='):
145 return cap[5:].split(',')
146 return ['zlib', 'none']
147
148 # client side
149
150 class wirepeer(repository.legacypeer):
117 class wirepeer(repository.legacypeer):
151 """Client-side interface for communicating with a peer repository.
118 """Client-side interface for communicating with a peer repository.
152
119
@@ -451,815 +418,3 b' class wirepeer(repository.legacypeer):'
451 """clearly abort the wire protocol connection and raise the exception
418 """clearly abort the wire protocol connection and raise the exception
452 """
419 """
453 raise NotImplementedError()
420 raise NotImplementedError()
454
455 # server side
456
457 # wire protocol command can either return a string or one of these classes.
458
459 def getdispatchrepo(repo, proto, command):
460 """Obtain the repo used for processing wire protocol commands.
461
462 The intent of this function is to serve as a monkeypatch point for
463 extensions that need commands to operate on different repo views under
464 specialized circumstances.
465 """
466 return repo.filtered('served')
467
468 def dispatch(repo, proto, command):
469 repo = getdispatchrepo(repo, proto, command)
470
471 transportversion = wireprototypes.TRANSPORTS[proto.name]['version']
472 commandtable = commandsv2 if transportversion == 2 else commands
473 func, spec = commandtable[command]
474
475 args = proto.getargs(spec)
476
477 # Version 1 protocols define arguments as a list. Version 2 uses a dict.
478 if isinstance(args, list):
479 return func(repo, proto, *args)
480 elif isinstance(args, dict):
481 return func(repo, proto, **args)
482 else:
483 raise error.ProgrammingError('unexpected type returned from '
484 'proto.getargs(): %s' % type(args))
485
486 def options(cmd, keys, others):
487 opts = {}
488 for k in keys:
489 if k in others:
490 opts[k] = others[k]
491 del others[k]
492 if others:
493 procutil.stderr.write("warning: %s ignored unexpected arguments %s\n"
494 % (cmd, ",".join(others)))
495 return opts
496
497 def bundle1allowed(repo, action):
498 """Whether a bundle1 operation is allowed from the server.
499
500 Priority is:
501
502 1. server.bundle1gd.<action> (if generaldelta active)
503 2. server.bundle1.<action>
504 3. server.bundle1gd (if generaldelta active)
505 4. server.bundle1
506 """
507 ui = repo.ui
508 gd = 'generaldelta' in repo.requirements
509
510 if gd:
511 v = ui.configbool('server', 'bundle1gd.%s' % action)
512 if v is not None:
513 return v
514
515 v = ui.configbool('server', 'bundle1.%s' % action)
516 if v is not None:
517 return v
518
519 if gd:
520 v = ui.configbool('server', 'bundle1gd')
521 if v is not None:
522 return v
523
524 return ui.configbool('server', 'bundle1')
525
526 def supportedcompengines(ui, role):
527 """Obtain the list of supported compression engines for a request."""
528 assert role in (util.CLIENTROLE, util.SERVERROLE)
529
530 compengines = util.compengines.supportedwireengines(role)
531
532 # Allow config to override default list and ordering.
533 if role == util.SERVERROLE:
534 configengines = ui.configlist('server', 'compressionengines')
535 config = 'server.compressionengines'
536 else:
537 # This is currently implemented mainly to facilitate testing. In most
538 # cases, the server should be in charge of choosing a compression engine
539 # because a server has the most to lose from a sub-optimal choice. (e.g.
540 # CPU DoS due to an expensive engine or a network DoS due to poor
541 # compression ratio).
542 configengines = ui.configlist('experimental',
543 'clientcompressionengines')
544 config = 'experimental.clientcompressionengines'
545
546 # No explicit config. Filter out the ones that aren't supposed to be
547 # advertised and return default ordering.
548 if not configengines:
549 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
550 return [e for e in compengines
551 if getattr(e.wireprotosupport(), attr) > 0]
552
553 # If compression engines are listed in the config, assume there is a good
554 # reason for it (like server operators wanting to achieve specific
555 # performance characteristics). So fail fast if the config references
556 # unusable compression engines.
557 validnames = set(e.name() for e in compengines)
558 invalidnames = set(e for e in configengines if e not in validnames)
559 if invalidnames:
560 raise error.Abort(_('invalid compression engine defined in %s: %s') %
561 (config, ', '.join(sorted(invalidnames))))
562
563 compengines = [e for e in compengines if e.name() in configengines]
564 compengines = sorted(compengines,
565 key=lambda e: configengines.index(e.name()))
566
567 if not compengines:
568 raise error.Abort(_('%s config option does not specify any known '
569 'compression engines') % config,
570 hint=_('usable compression engines: %s') %
571 ', '.sorted(validnames))
572
573 return compengines
574
575 class commandentry(object):
576 """Represents a declared wire protocol command."""
577 def __init__(self, func, args='', transports=None,
578 permission='push'):
579 self.func = func
580 self.args = args
581 self.transports = transports or set()
582 self.permission = permission
583
584 def _merge(self, func, args):
585 """Merge this instance with an incoming 2-tuple.
586
587 This is called when a caller using the old 2-tuple API attempts
588 to replace an instance. The incoming values are merged with
589 data not captured by the 2-tuple and a new instance containing
590 the union of the two objects is returned.
591 """
592 return commandentry(func, args=args, transports=set(self.transports),
593 permission=self.permission)
594
595 # Old code treats instances as 2-tuples. So expose that interface.
596 def __iter__(self):
597 yield self.func
598 yield self.args
599
600 def __getitem__(self, i):
601 if i == 0:
602 return self.func
603 elif i == 1:
604 return self.args
605 else:
606 raise IndexError('can only access elements 0 and 1')
607
608 class commanddict(dict):
609 """Container for registered wire protocol commands.
610
611 It behaves like a dict. But __setitem__ is overwritten to allow silent
612 coercion of values from 2-tuples for API compatibility.
613 """
614 def __setitem__(self, k, v):
615 if isinstance(v, commandentry):
616 pass
617 # Cast 2-tuples to commandentry instances.
618 elif isinstance(v, tuple):
619 if len(v) != 2:
620 raise ValueError('command tuples must have exactly 2 elements')
621
622 # It is common for extensions to wrap wire protocol commands via
623 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
624 # doing this aren't aware of the new API that uses objects to store
625 # command entries, we automatically merge old state with new.
626 if k in self:
627 v = self[k]._merge(v[0], v[1])
628 else:
629 # Use default values from @wireprotocommand.
630 v = commandentry(v[0], args=v[1],
631 transports=set(wireprototypes.TRANSPORTS),
632 permission='push')
633 else:
634 raise ValueError('command entries must be commandentry instances '
635 'or 2-tuples')
636
637 return super(commanddict, self).__setitem__(k, v)
638
639 def commandavailable(self, command, proto):
640 """Determine if a command is available for the requested protocol."""
641 assert proto.name in wireprototypes.TRANSPORTS
642
643 entry = self.get(command)
644
645 if not entry:
646 return False
647
648 if proto.name not in entry.transports:
649 return False
650
651 return True
652
653 # Constants specifying which transports a wire protocol command should be
654 # available on. For use with @wireprotocommand.
655 POLICY_V1_ONLY = 'v1-only'
656 POLICY_V2_ONLY = 'v2-only'
657
658 # For version 1 transports.
659 commands = commanddict()
660
661 # For version 2 transports.
662 commandsv2 = commanddict()
663
664 def wireprotocommand(name, args=None, transportpolicy=POLICY_V1_ONLY,
665 permission='push'):
666 """Decorator to declare a wire protocol command.
667
668 ``name`` is the name of the wire protocol command being provided.
669
670 ``args`` defines the named arguments accepted by the command. It is
671 ideally a dict mapping argument names to their types. For backwards
672 compatibility, it can be a space-delimited list of argument names. For
673 version 1 transports, ``*`` denotes a special value that says to accept
674 all named arguments.
675
676 ``transportpolicy`` is a POLICY_* constant denoting which transports
677 this wire protocol command should be exposed to. By default, commands
678 are exposed to all wire protocol transports.
679
680 ``permission`` defines the permission type needed to run this command.
681 Can be ``push`` or ``pull``. These roughly map to read-write and read-only,
682 respectively. Default is to assume command requires ``push`` permissions
683 because otherwise commands not declaring their permissions could modify
684 a repository that is supposed to be read-only.
685 """
686 if transportpolicy == POLICY_V1_ONLY:
687 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
688 if v['version'] == 1}
689 transportversion = 1
690 elif transportpolicy == POLICY_V2_ONLY:
691 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
692 if v['version'] == 2}
693 transportversion = 2
694 else:
695 raise error.ProgrammingError('invalid transport policy value: %s' %
696 transportpolicy)
697
698 # Because SSHv2 is a mirror of SSHv1, we allow "batch" commands through to
699 # SSHv2.
700 # TODO undo this hack when SSH is using the unified frame protocol.
701 if name == b'batch':
702 transports.add(wireprototypes.SSHV2)
703
704 if permission not in ('push', 'pull'):
705 raise error.ProgrammingError('invalid wire protocol permission; '
706 'got %s; expected "push" or "pull"' %
707 permission)
708
709 if transportversion == 1:
710 if args is None:
711 args = ''
712
713 if not isinstance(args, bytes):
714 raise error.ProgrammingError('arguments for version 1 commands '
715 'must be declared as bytes')
716 elif transportversion == 2:
717 if args is None:
718 args = {}
719
720 if not isinstance(args, dict):
721 raise error.ProgrammingError('arguments for version 2 commands '
722 'must be declared as dicts')
723
724 def register(func):
725 if transportversion == 1:
726 if name in commands:
727 raise error.ProgrammingError('%s command already registered '
728 'for version 1' % name)
729 commands[name] = commandentry(func, args=args,
730 transports=transports,
731 permission=permission)
732 elif transportversion == 2:
733 if name in commandsv2:
734 raise error.ProgrammingError('%s command already registered '
735 'for version 2' % name)
736
737 commandsv2[name] = commandentry(func, args=args,
738 transports=transports,
739 permission=permission)
740 else:
741 raise error.ProgrammingError('unhandled transport version: %d' %
742 transportversion)
743
744 return func
745 return register
746
747 # TODO define a more appropriate permissions type to use for this.
748 @wireprotocommand('batch', 'cmds *', permission='pull',
749 transportpolicy=POLICY_V1_ONLY)
750 def batch(repo, proto, cmds, others):
751 unescapearg = wireprototypes.unescapebatcharg
752 repo = repo.filtered("served")
753 res = []
754 for pair in cmds.split(';'):
755 op, args = pair.split(' ', 1)
756 vals = {}
757 for a in args.split(','):
758 if a:
759 n, v = a.split('=')
760 vals[unescapearg(n)] = unescapearg(v)
761 func, spec = commands[op]
762
763 # Validate that client has permissions to perform this command.
764 perm = commands[op].permission
765 assert perm in ('push', 'pull')
766 proto.checkperm(perm)
767
768 if spec:
769 keys = spec.split()
770 data = {}
771 for k in keys:
772 if k == '*':
773 star = {}
774 for key in vals.keys():
775 if key not in keys:
776 star[key] = vals[key]
777 data['*'] = star
778 else:
779 data[k] = vals[k]
780 result = func(repo, proto, *[data[k] for k in keys])
781 else:
782 result = func(repo, proto)
783 if isinstance(result, wireprototypes.ooberror):
784 return result
785
786 # For now, all batchable commands must return bytesresponse or
787 # raw bytes (for backwards compatibility).
788 assert isinstance(result, (wireprototypes.bytesresponse, bytes))
789 if isinstance(result, wireprototypes.bytesresponse):
790 result = result.data
791 res.append(wireprototypes.escapebatcharg(result))
792
793 return wireprototypes.bytesresponse(';'.join(res))
794
795 @wireprotocommand('between', 'pairs', transportpolicy=POLICY_V1_ONLY,
796 permission='pull')
797 def between(repo, proto, pairs):
798 pairs = [wireprototypes.decodelist(p, '-') for p in pairs.split(" ")]
799 r = []
800 for b in repo.between(pairs):
801 r.append(wireprototypes.encodelist(b) + "\n")
802
803 return wireprototypes.bytesresponse(''.join(r))
804
805 @wireprotocommand('branchmap', permission='pull',
806 transportpolicy=POLICY_V1_ONLY)
807 def branchmap(repo, proto):
808 branchmap = repo.branchmap()
809 heads = []
810 for branch, nodes in branchmap.iteritems():
811 branchname = urlreq.quote(encoding.fromlocal(branch))
812 branchnodes = wireprototypes.encodelist(nodes)
813 heads.append('%s %s' % (branchname, branchnodes))
814
815 return wireprototypes.bytesresponse('\n'.join(heads))
816
817 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY,
818 permission='pull')
819 def branches(repo, proto, nodes):
820 nodes = wireprototypes.decodelist(nodes)
821 r = []
822 for b in repo.branches(nodes):
823 r.append(wireprototypes.encodelist(b) + "\n")
824
825 return wireprototypes.bytesresponse(''.join(r))
826
827 @wireprotocommand('clonebundles', '', permission='pull',
828 transportpolicy=POLICY_V1_ONLY)
829 def clonebundles(repo, proto):
830 """Server command for returning info for available bundles to seed clones.
831
832 Clients will parse this response and determine what bundle to fetch.
833
834 Extensions may wrap this command to filter or dynamically emit data
835 depending on the request. e.g. you could advertise URLs for the closest
836 data center given the client's IP address.
837 """
838 return wireprototypes.bytesresponse(
839 repo.vfs.tryread('clonebundles.manifest'))
840
841 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
842 'known', 'getbundle', 'unbundlehash']
843
844 def _capabilities(repo, proto):
845 """return a list of capabilities for a repo
846
847 This function exists to allow extensions to easily wrap capabilities
848 computation
849
850 - returns a lists: easy to alter
851 - change done here will be propagated to both `capabilities` and `hello`
852 command without any other action needed.
853 """
854 # copy to prevent modification of the global list
855 caps = list(wireprotocaps)
856
857 # Command of same name as capability isn't exposed to version 1 of
858 # transports. So conditionally add it.
859 if commands.commandavailable('changegroupsubset', proto):
860 caps.append('changegroupsubset')
861
862 if streamclone.allowservergeneration(repo):
863 if repo.ui.configbool('server', 'preferuncompressed'):
864 caps.append('stream-preferred')
865 requiredformats = repo.requirements & repo.supportedformats
866 # if our local revlogs are just revlogv1, add 'stream' cap
867 if not requiredformats - {'revlogv1'}:
868 caps.append('stream')
869 # otherwise, add 'streamreqs' detailing our local revlog format
870 else:
871 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
872 if repo.ui.configbool('experimental', 'bundle2-advertise'):
873 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
874 caps.append('bundle2=' + urlreq.quote(capsblob))
875 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
876
877 return proto.addcapabilities(repo, caps)
878
879 # If you are writing an extension and consider wrapping this function. Wrap
880 # `_capabilities` instead.
881 @wireprotocommand('capabilities', permission='pull',
882 transportpolicy=POLICY_V1_ONLY)
883 def capabilities(repo, proto):
884 caps = _capabilities(repo, proto)
885 return wireprototypes.bytesresponse(' '.join(sorted(caps)))
886
887 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY,
888 permission='pull')
889 def changegroup(repo, proto, roots):
890 nodes = wireprototypes.decodelist(roots)
891 outgoing = discovery.outgoing(repo, missingroots=nodes,
892 missingheads=repo.heads())
893 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
894 gen = iter(lambda: cg.read(32768), '')
895 return wireprototypes.streamres(gen=gen)
896
897 @wireprotocommand('changegroupsubset', 'bases heads',
898 transportpolicy=POLICY_V1_ONLY,
899 permission='pull')
900 def changegroupsubset(repo, proto, bases, heads):
901 bases = wireprototypes.decodelist(bases)
902 heads = wireprototypes.decodelist(heads)
903 outgoing = discovery.outgoing(repo, missingroots=bases,
904 missingheads=heads)
905 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
906 gen = iter(lambda: cg.read(32768), '')
907 return wireprototypes.streamres(gen=gen)
908
909 @wireprotocommand('debugwireargs', 'one two *',
910 permission='pull', transportpolicy=POLICY_V1_ONLY)
911 def debugwireargs(repo, proto, one, two, others):
912 # only accept optional args from the known set
913 opts = options('debugwireargs', ['three', 'four'], others)
914 return wireprototypes.bytesresponse(repo.debugwireargs(
915 one, two, **pycompat.strkwargs(opts)))
916
917 def find_pullbundle(repo, proto, opts, clheads, heads, common):
918 """Return a file object for the first matching pullbundle.
919
920 Pullbundles are specified in .hg/pullbundles.manifest similar to
921 clonebundles.
922 For each entry, the bundle specification is checked for compatibility:
923 - Client features vs the BUNDLESPEC.
924 - Revisions shared with the clients vs base revisions of the bundle.
925 A bundle can be applied only if all its base revisions are known by
926 the client.
927 - At least one leaf of the bundle's DAG is missing on the client.
928 - Every leaf of the bundle's DAG is part of node set the client wants.
929 E.g. do not send a bundle of all changes if the client wants only
930 one specific branch of many.
931 """
932 def decodehexstring(s):
933 return set([h.decode('hex') for h in s.split(';')])
934
935 manifest = repo.vfs.tryread('pullbundles.manifest')
936 if not manifest:
937 return None
938 res = exchange.parseclonebundlesmanifest(repo, manifest)
939 res = exchange.filterclonebundleentries(repo, res)
940 if not res:
941 return None
942 cl = repo.changelog
943 heads_anc = cl.ancestors([cl.rev(rev) for rev in heads], inclusive=True)
944 common_anc = cl.ancestors([cl.rev(rev) for rev in common], inclusive=True)
945 compformats = clientcompressionsupport(proto)
946 for entry in res:
947 if 'COMPRESSION' in entry and entry['COMPRESSION'] not in compformats:
948 continue
949 # No test yet for VERSION, since V2 is supported by any client
950 # that advertises partial pulls
951 if 'heads' in entry:
952 try:
953 bundle_heads = decodehexstring(entry['heads'])
954 except TypeError:
955 # Bad heads entry
956 continue
957 if bundle_heads.issubset(common):
958 continue # Nothing new
959 if all(cl.rev(rev) in common_anc for rev in bundle_heads):
960 continue # Still nothing new
961 if any(cl.rev(rev) not in heads_anc and
962 cl.rev(rev) not in common_anc for rev in bundle_heads):
963 continue
964 if 'bases' in entry:
965 try:
966 bundle_bases = decodehexstring(entry['bases'])
967 except TypeError:
968 # Bad bases entry
969 continue
970 if not all(cl.rev(rev) in common_anc for rev in bundle_bases):
971 continue
972 path = entry['URL']
973 repo.ui.debug('sending pullbundle "%s"\n' % path)
974 try:
975 return repo.vfs.open(path)
976 except IOError:
977 repo.ui.debug('pullbundle "%s" not accessible\n' % path)
978 continue
979 return None
980
981 @wireprotocommand('getbundle', '*', permission='pull',
982 transportpolicy=POLICY_V1_ONLY)
983 def getbundle(repo, proto, others):
984 opts = options('getbundle', wireprototypes.GETBUNDLE_ARGUMENTS.keys(),
985 others)
986 for k, v in opts.iteritems():
987 keytype = wireprototypes.GETBUNDLE_ARGUMENTS[k]
988 if keytype == 'nodes':
989 opts[k] = wireprototypes.decodelist(v)
990 elif keytype == 'csv':
991 opts[k] = list(v.split(','))
992 elif keytype == 'scsv':
993 opts[k] = set(v.split(','))
994 elif keytype == 'boolean':
995 # Client should serialize False as '0', which is a non-empty string
996 # so it evaluates as a True bool.
997 if v == '0':
998 opts[k] = False
999 else:
1000 opts[k] = bool(v)
1001 elif keytype != 'plain':
1002 raise KeyError('unknown getbundle option type %s'
1003 % keytype)
1004
1005 if not bundle1allowed(repo, 'pull'):
1006 if not exchange.bundle2requested(opts.get('bundlecaps')):
1007 if proto.name == 'http-v1':
1008 return wireprototypes.ooberror(bundle2required)
1009 raise error.Abort(bundle2requiredmain,
1010 hint=bundle2requiredhint)
1011
1012 prefercompressed = True
1013
1014 try:
1015 clheads = set(repo.changelog.heads())
1016 heads = set(opts.get('heads', set()))
1017 common = set(opts.get('common', set()))
1018 common.discard(nullid)
1019 if (repo.ui.configbool('server', 'pullbundle') and
1020 'partial-pull' in proto.getprotocaps()):
1021 # Check if a pre-built bundle covers this request.
1022 bundle = find_pullbundle(repo, proto, opts, clheads, heads, common)
1023 if bundle:
1024 return wireprototypes.streamres(gen=util.filechunkiter(bundle),
1025 prefer_uncompressed=True)
1026
1027 if repo.ui.configbool('server', 'disablefullbundle'):
1028 # Check to see if this is a full clone.
1029 changegroup = opts.get('cg', True)
1030 if changegroup and not common and clheads == heads:
1031 raise error.Abort(
1032 _('server has pull-based clones disabled'),
1033 hint=_('remove --pull if specified or upgrade Mercurial'))
1034
1035 info, chunks = exchange.getbundlechunks(repo, 'serve',
1036 **pycompat.strkwargs(opts))
1037 prefercompressed = info.get('prefercompressed', True)
1038 except error.Abort as exc:
1039 # cleanly forward Abort error to the client
1040 if not exchange.bundle2requested(opts.get('bundlecaps')):
1041 if proto.name == 'http-v1':
1042 return wireprototypes.ooberror(pycompat.bytestr(exc) + '\n')
1043 raise # cannot do better for bundle1 + ssh
1044 # bundle2 request expect a bundle2 reply
1045 bundler = bundle2.bundle20(repo.ui)
1046 manargs = [('message', pycompat.bytestr(exc))]
1047 advargs = []
1048 if exc.hint is not None:
1049 advargs.append(('hint', exc.hint))
1050 bundler.addpart(bundle2.bundlepart('error:abort',
1051 manargs, advargs))
1052 chunks = bundler.getchunks()
1053 prefercompressed = False
1054
1055 return wireprototypes.streamres(
1056 gen=chunks, prefer_uncompressed=not prefercompressed)
1057
1058 @wireprotocommand('heads', permission='pull', transportpolicy=POLICY_V1_ONLY)
1059 def heads(repo, proto):
1060 h = repo.heads()
1061 return wireprototypes.bytesresponse(wireprototypes.encodelist(h) + '\n')
1062
1063 @wireprotocommand('hello', permission='pull', transportpolicy=POLICY_V1_ONLY)
1064 def hello(repo, proto):
1065 """Called as part of SSH handshake to obtain server info.
1066
1067 Returns a list of lines describing interesting things about the
1068 server, in an RFC822-like format.
1069
1070 Currently, the only one defined is ``capabilities``, which consists of a
1071 line of space separated tokens describing server abilities:
1072
1073 capabilities: <token0> <token1> <token2>
1074 """
1075 caps = capabilities(repo, proto).data
1076 return wireprototypes.bytesresponse('capabilities: %s\n' % caps)
1077
1078 @wireprotocommand('listkeys', 'namespace', permission='pull',
1079 transportpolicy=POLICY_V1_ONLY)
1080 def listkeys(repo, proto, namespace):
1081 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
1082 return wireprototypes.bytesresponse(pushkeymod.encodekeys(d))
1083
1084 @wireprotocommand('lookup', 'key', permission='pull',
1085 transportpolicy=POLICY_V1_ONLY)
1086 def lookup(repo, proto, key):
1087 try:
1088 k = encoding.tolocal(key)
1089 n = repo.lookup(k)
1090 r = hex(n)
1091 success = 1
1092 except Exception as inst:
1093 r = stringutil.forcebytestr(inst)
1094 success = 0
1095 return wireprototypes.bytesresponse('%d %s\n' % (success, r))
1096
1097 @wireprotocommand('known', 'nodes *', permission='pull',
1098 transportpolicy=POLICY_V1_ONLY)
1099 def known(repo, proto, nodes, others):
1100 v = ''.join(b and '1' or '0'
1101 for b in repo.known(wireprototypes.decodelist(nodes)))
1102 return wireprototypes.bytesresponse(v)
1103
1104 @wireprotocommand('protocaps', 'caps', permission='pull',
1105 transportpolicy=POLICY_V1_ONLY)
1106 def protocaps(repo, proto, caps):
1107 if proto.name == wireprototypes.SSHV1:
1108 proto._protocaps = set(caps.split(' '))
1109 return wireprototypes.bytesresponse('OK')
1110
1111 @wireprotocommand('pushkey', 'namespace key old new', permission='push',
1112 transportpolicy=POLICY_V1_ONLY)
1113 def pushkey(repo, proto, namespace, key, old, new):
1114 # compatibility with pre-1.8 clients which were accidentally
1115 # sending raw binary nodes rather than utf-8-encoded hex
1116 if len(new) == 20 and stringutil.escapestr(new) != new:
1117 # looks like it could be a binary node
1118 try:
1119 new.decode('utf-8')
1120 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
1121 except UnicodeDecodeError:
1122 pass # binary, leave unmodified
1123 else:
1124 new = encoding.tolocal(new) # normal path
1125
1126 with proto.mayberedirectstdio() as output:
1127 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
1128 encoding.tolocal(old), new) or False
1129
1130 output = output.getvalue() if output else ''
1131 return wireprototypes.bytesresponse('%d\n%s' % (int(r), output))
1132
1133 @wireprotocommand('stream_out', permission='pull',
1134 transportpolicy=POLICY_V1_ONLY)
1135 def stream(repo, proto):
1136 '''If the server supports streaming clone, it advertises the "stream"
1137 capability with a value representing the version and flags of the repo
1138 it is serving. Client checks to see if it understands the format.
1139 '''
1140 return wireprototypes.streamreslegacy(
1141 streamclone.generatev1wireproto(repo))
1142
1143 @wireprotocommand('unbundle', 'heads', permission='push',
1144 transportpolicy=POLICY_V1_ONLY)
1145 def unbundle(repo, proto, heads):
1146 their_heads = wireprototypes.decodelist(heads)
1147
1148 with proto.mayberedirectstdio() as output:
1149 try:
1150 exchange.check_heads(repo, their_heads, 'preparing changes')
1151 cleanup = lambda: None
1152 try:
1153 payload = proto.getpayload()
1154 if repo.ui.configbool('server', 'streamunbundle'):
1155 def cleanup():
1156 # Ensure that the full payload is consumed, so
1157 # that the connection doesn't contain trailing garbage.
1158 for p in payload:
1159 pass
1160 fp = util.chunkbuffer(payload)
1161 else:
1162 # write bundle data to temporary file as it can be big
1163 fp, tempname = None, None
1164 def cleanup():
1165 if fp:
1166 fp.close()
1167 if tempname:
1168 os.unlink(tempname)
1169 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1170 repo.ui.debug('redirecting incoming bundle to %s\n' %
1171 tempname)
1172 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1173 r = 0
1174 for p in payload:
1175 fp.write(p)
1176 fp.seek(0)
1177
1178 gen = exchange.readbundle(repo.ui, fp, None)
1179 if (isinstance(gen, changegroupmod.cg1unpacker)
1180 and not bundle1allowed(repo, 'push')):
1181 if proto.name == 'http-v1':
1182 # need to special case http because stderr do not get to
1183 # the http client on failed push so we need to abuse
1184 # some other error type to make sure the message get to
1185 # the user.
1186 return wireprototypes.ooberror(bundle2required)
1187 raise error.Abort(bundle2requiredmain,
1188 hint=bundle2requiredhint)
1189
1190 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1191 proto.client())
1192 if util.safehasattr(r, 'addpart'):
1193 # The return looks streamable, we are in the bundle2 case
1194 # and should return a stream.
1195 return wireprototypes.streamreslegacy(gen=r.getchunks())
1196 return wireprototypes.pushres(
1197 r, output.getvalue() if output else '')
1198
1199 finally:
1200 cleanup()
1201
1202 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1203 # handle non-bundle2 case first
1204 if not getattr(exc, 'duringunbundle2', False):
1205 try:
1206 raise
1207 except error.Abort:
1208 # The old code we moved used procutil.stderr directly.
1209 # We did not change it to minimise code change.
1210 # This need to be moved to something proper.
1211 # Feel free to do it.
1212 procutil.stderr.write("abort: %s\n" % exc)
1213 if exc.hint is not None:
1214 procutil.stderr.write("(%s)\n" % exc.hint)
1215 procutil.stderr.flush()
1216 return wireprototypes.pushres(
1217 0, output.getvalue() if output else '')
1218 except error.PushRaced:
1219 return wireprototypes.pusherr(
1220 pycompat.bytestr(exc),
1221 output.getvalue() if output else '')
1222
1223 bundler = bundle2.bundle20(repo.ui)
1224 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1225 bundler.addpart(out)
1226 try:
1227 try:
1228 raise
1229 except error.PushkeyFailed as exc:
1230 # check client caps
1231 remotecaps = getattr(exc, '_replycaps', None)
1232 if (remotecaps is not None
1233 and 'pushkey' not in remotecaps.get('error', ())):
1234 # no support remote side, fallback to Abort handler.
1235 raise
1236 part = bundler.newpart('error:pushkey')
1237 part.addparam('in-reply-to', exc.partid)
1238 if exc.namespace is not None:
1239 part.addparam('namespace', exc.namespace,
1240 mandatory=False)
1241 if exc.key is not None:
1242 part.addparam('key', exc.key, mandatory=False)
1243 if exc.new is not None:
1244 part.addparam('new', exc.new, mandatory=False)
1245 if exc.old is not None:
1246 part.addparam('old', exc.old, mandatory=False)
1247 if exc.ret is not None:
1248 part.addparam('ret', exc.ret, mandatory=False)
1249 except error.BundleValueError as exc:
1250 errpart = bundler.newpart('error:unsupportedcontent')
1251 if exc.parttype is not None:
1252 errpart.addparam('parttype', exc.parttype)
1253 if exc.params:
1254 errpart.addparam('params', '\0'.join(exc.params))
1255 except error.Abort as exc:
1256 manargs = [('message', stringutil.forcebytestr(exc))]
1257 advargs = []
1258 if exc.hint is not None:
1259 advargs.append(('hint', exc.hint))
1260 bundler.addpart(bundle2.bundlepart('error:abort',
1261 manargs, advargs))
1262 except error.PushRaced as exc:
1263 bundler.newpart('error:pushraced',
1264 [('message', stringutil.forcebytestr(exc))])
1265 return wireprototypes.streamreslegacy(gen=bundler.getchunks())
@@ -11,7 +11,7 b' from mercurial import ('
11 error,
11 error,
12 peer,
12 peer,
13 util,
13 util,
14 wireproto,
14 wireprotov1peer,
15 )
15 )
16
16
17 # equivalent of repo.repository
17 # equivalent of repo.repository
@@ -177,7 +177,7 b' class remotething(thing):'
177 yield r
177 yield r
178
178
179 def batchiter(self):
179 def batchiter(self):
180 return wireproto.remoteiterbatcher(self)
180 return wireprotov1peer.remoteiterbatcher(self)
181
181
182 @peer.batchable
182 @peer.batchable
183 def foo(self, one, two=None):
183 def foo(self, one, two=None):
@@ -7,6 +7,7 b' from mercurial import ('
7 util,
7 util,
8 wireproto,
8 wireproto,
9 wireprototypes,
9 wireprototypes,
10 wireprotov1peer,
10 )
11 )
11 stringio = util.stringio
12 stringio = util.stringio
12
13
@@ -29,7 +30,7 b" wireprototypes.TRANSPORTS['dummyproto'] "
29 'version': 1,
30 'version': 1,
30 }
31 }
31
32
32 class clientpeer(wireproto.wirepeer):
33 class clientpeer(wireprotov1peer.wirepeer):
33 def __init__(self, serverrepo, ui):
34 def __init__(self, serverrepo, ui):
34 self.serverrepo = serverrepo
35 self.serverrepo = serverrepo
35 self.ui = ui
36 self.ui = ui
@@ -65,9 +66,9 b' class clientpeer(wireproto.wirepeer):'
65 def _callstream(self, cmd, **args):
66 def _callstream(self, cmd, **args):
66 return stringio(self._call(cmd, **args))
67 return stringio(self._call(cmd, **args))
67
68
68 @wireproto.batchable
69 @wireprotov1peer.batchable
69 def greet(self, name):
70 def greet(self, name):
70 f = wireproto.future()
71 f = wireprotov1peer.future()
71 yield {b'name': mangle(name)}, f
72 yield {b'name': mangle(name)}, f
72 yield unmangle(f.value)
73 yield unmangle(f.value)
73
74
General Comments 0
You need to be logged in to leave comments. Login now