##// END OF EJS Templates
wireproto: split streamres into legacy and modern case...
Joerg Sonnenberger -
r35768:a39a9df7 default
parent child Browse files
Show More
@@ -1,190 +1,190
1 1 # Copyright 2011 Fog Creek Software
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5 from __future__ import absolute_import
6 6
7 7 import os
8 8 import re
9 9
10 10 from mercurial.i18n import _
11 11
12 12 from mercurial import (
13 13 error,
14 14 httppeer,
15 15 util,
16 16 wireproto,
17 17 )
18 18
19 19 from . import (
20 20 lfutil,
21 21 )
22 22
23 23 urlerr = util.urlerr
24 24 urlreq = util.urlreq
25 25
26 26 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
27 27 '\n\nPlease enable it in your Mercurial config '
28 28 'file.\n')
29 29
30 30 # these will all be replaced by largefiles.uisetup
31 31 ssholdcallstream = None
32 32 httpoldcallstream = None
33 33
34 34 def putlfile(repo, proto, sha):
35 35 '''Server command for putting a largefile into a repository's local store
36 36 and into the user cache.'''
37 37 proto.redirect()
38 38
39 39 path = lfutil.storepath(repo, sha)
40 40 util.makedirs(os.path.dirname(path))
41 41 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
42 42
43 43 try:
44 44 proto.getfile(tmpfp)
45 45 tmpfp._fp.seek(0)
46 46 if sha != lfutil.hexsha1(tmpfp._fp):
47 47 raise IOError(0, _('largefile contents do not match hash'))
48 48 tmpfp.close()
49 49 lfutil.linktousercache(repo, sha)
50 50 except IOError as e:
51 51 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
52 52 (sha, e.strerror))
53 53 return wireproto.pushres(1)
54 54 finally:
55 55 tmpfp.discard()
56 56
57 57 return wireproto.pushres(0)
58 58
59 59 def getlfile(repo, proto, sha):
60 60 '''Server command for retrieving a largefile from the repository-local
61 61 cache or user cache.'''
62 62 filename = lfutil.findfile(repo, sha)
63 63 if not filename:
64 64 raise error.Abort(_('requested largefile %s not present in cache')
65 65 % sha)
66 66 f = open(filename, 'rb')
67 67 length = os.fstat(f.fileno())[6]
68 68
69 69 # Since we can't set an HTTP content-length header here, and
70 70 # Mercurial core provides no way to give the length of a streamres
71 71 # (and reading the entire file into RAM would be ill-advised), we
72 72 # just send the length on the first line of the response, like the
73 73 # ssh proto does for string responses.
74 74 def generator():
75 75 yield '%d\n' % length
76 76 for chunk in util.filechunkiter(f):
77 77 yield chunk
78 return wireproto.streamres(gen=generator())
78 return wireproto.streamres_legacy(gen=generator())
79 79
80 80 def statlfile(repo, proto, sha):
81 81 '''Server command for checking if a largefile is present - returns '2\n' if
82 82 the largefile is missing, '0\n' if it seems to be in good condition.
83 83
84 84 The value 1 is reserved for mismatched checksum, but that is too expensive
85 85 to be verified on every stat and must be caught be running 'hg verify'
86 86 server side.'''
87 87 filename = lfutil.findfile(repo, sha)
88 88 if not filename:
89 89 return '2\n'
90 90 return '0\n'
91 91
92 92 def wirereposetup(ui, repo):
93 93 class lfileswirerepository(repo.__class__):
94 94 def putlfile(self, sha, fd):
95 95 # unfortunately, httprepository._callpush tries to convert its
96 96 # input file-like into a bundle before sending it, so we can't use
97 97 # it ...
98 98 if issubclass(self.__class__, httppeer.httppeer):
99 99 res = self._call('putlfile', data=fd, sha=sha,
100 100 headers={'content-type':'application/mercurial-0.1'})
101 101 try:
102 102 d, output = res.split('\n', 1)
103 103 for l in output.splitlines(True):
104 104 self.ui.warn(_('remote: '), l) # assume l ends with \n
105 105 return int(d)
106 106 except ValueError:
107 107 self.ui.warn(_('unexpected putlfile response: %r\n') % res)
108 108 return 1
109 109 # ... but we can't use sshrepository._call because the data=
110 110 # argument won't get sent, and _callpush does exactly what we want
111 111 # in this case: send the data straight through
112 112 else:
113 113 try:
114 114 ret, output = self._callpush("putlfile", fd, sha=sha)
115 115 if ret == "":
116 116 raise error.ResponseError(_('putlfile failed:'),
117 117 output)
118 118 return int(ret)
119 119 except IOError:
120 120 return 1
121 121 except ValueError:
122 122 raise error.ResponseError(
123 123 _('putlfile failed (unexpected response):'), ret)
124 124
125 125 def getlfile(self, sha):
126 126 """returns an iterable with the chunks of the file with sha sha"""
127 127 stream = self._callstream("getlfile", sha=sha)
128 128 length = stream.readline()
129 129 try:
130 130 length = int(length)
131 131 except ValueError:
132 132 self._abort(error.ResponseError(_("unexpected response:"),
133 133 length))
134 134
135 135 # SSH streams will block if reading more than length
136 136 for chunk in util.filechunkiter(stream, limit=length):
137 137 yield chunk
138 138 # HTTP streams must hit the end to process the last empty
139 139 # chunk of Chunked-Encoding so the connection can be reused.
140 140 if issubclass(self.__class__, httppeer.httppeer):
141 141 chunk = stream.read(1)
142 142 if chunk:
143 143 self._abort(error.ResponseError(_("unexpected response:"),
144 144 chunk))
145 145
146 146 @wireproto.batchable
147 147 def statlfile(self, sha):
148 148 f = wireproto.future()
149 149 result = {'sha': sha}
150 150 yield result, f
151 151 try:
152 152 yield int(f.value)
153 153 except (ValueError, urlerr.httperror):
154 154 # If the server returns anything but an integer followed by a
155 155 # newline, newline, it's not speaking our language; if we get
156 156 # an HTTP error, we can't be sure the largefile is present;
157 157 # either way, consider it missing.
158 158 yield 2
159 159
160 160 repo.__class__ = lfileswirerepository
161 161
162 162 # advertise the largefiles=serve capability
163 163 def _capabilities(orig, repo, proto):
164 164 '''announce largefile server capability'''
165 165 caps = orig(repo, proto)
166 166 caps.append('largefiles=serve')
167 167 return caps
168 168
169 169 def heads(repo, proto):
170 170 '''Wrap server command - largefile capable clients will know to call
171 171 lheads instead'''
172 172 if lfutil.islfilesrepo(repo):
173 173 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
174 174 return wireproto.heads(repo, proto)
175 175
176 176 def sshrepocallstream(self, cmd, **args):
177 177 if cmd == 'heads' and self.capable('largefiles'):
178 178 cmd = 'lheads'
179 179 if cmd == 'batch' and self.capable('largefiles'):
180 180 args[r'cmds'] = args[r'cmds'].replace('heads ', 'lheads ')
181 181 return ssholdcallstream(self, cmd, **args)
182 182
183 183 headsre = re.compile(r'(^|;)heads\b')
184 184
185 185 def httprepocallstream(self, cmd, **args):
186 186 if cmd == 'heads' and self.capable('largefiles'):
187 187 cmd = 'lheads'
188 188 if cmd == 'batch' and self.capable('largefiles'):
189 189 args[r'cmds'] = headsre.sub('lheads', args[r'cmds'])
190 190 return httpoldcallstream(self, cmd, **args)
@@ -1,207 +1,201
1 1 #
2 2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import cgi
11 11 import struct
12 12
13 13 from .common import (
14 14 HTTP_OK,
15 15 )
16 16
17 17 from .. import (
18 18 error,
19 19 pycompat,
20 20 util,
21 21 wireproto,
22 22 )
23 23 stringio = util.stringio
24 24
25 25 urlerr = util.urlerr
26 26 urlreq = util.urlreq
27 27
28 28 HGTYPE = 'application/mercurial-0.1'
29 29 HGTYPE2 = 'application/mercurial-0.2'
30 30 HGERRTYPE = 'application/hg-error'
31 31
32 32 def decodevaluefromheaders(req, headerprefix):
33 33 """Decode a long value from multiple HTTP request headers.
34 34
35 35 Returns the value as a bytes, not a str.
36 36 """
37 37 chunks = []
38 38 i = 1
39 39 prefix = headerprefix.upper().replace(r'-', r'_')
40 40 while True:
41 41 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
42 42 if v is None:
43 43 break
44 44 chunks.append(pycompat.bytesurl(v))
45 45 i += 1
46 46
47 47 return ''.join(chunks)
48 48
49 49 class webproto(wireproto.abstractserverproto):
50 50 def __init__(self, req, ui):
51 51 self.req = req
52 52 self.response = ''
53 53 self.ui = ui
54 54 self.name = 'http'
55 55
56 56 def getargs(self, args):
57 57 knownargs = self._args()
58 58 data = {}
59 59 keys = args.split()
60 60 for k in keys:
61 61 if k == '*':
62 62 star = {}
63 63 for key in knownargs.keys():
64 64 if key != 'cmd' and key not in keys:
65 65 star[key] = knownargs[key][0]
66 66 data['*'] = star
67 67 else:
68 68 data[k] = knownargs[k][0]
69 69 return [data[k] for k in keys]
70 70 def _args(self):
71 71 args = self.req.form.copy()
72 72 if pycompat.ispy3:
73 73 args = {k.encode('ascii'): [v.encode('ascii') for v in vs]
74 74 for k, vs in args.items()}
75 75 postlen = int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
76 76 if postlen:
77 77 args.update(cgi.parse_qs(
78 78 self.req.read(postlen), keep_blank_values=True))
79 79 return args
80 80
81 81 argvalue = decodevaluefromheaders(self.req, r'X-HgArg')
82 82 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
83 83 return args
84 84 def getfile(self, fp):
85 85 length = int(self.req.env[r'CONTENT_LENGTH'])
86 86 # If httppostargs is used, we need to read Content-Length
87 87 # minus the amount that was consumed by args.
88 88 length -= int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
89 89 for s in util.filechunkiter(self.req, limit=length):
90 90 fp.write(s)
91 91 def redirect(self):
92 92 self.oldio = self.ui.fout, self.ui.ferr
93 93 self.ui.ferr = self.ui.fout = stringio()
94 94 def restore(self):
95 95 val = self.ui.fout.getvalue()
96 96 self.ui.ferr, self.ui.fout = self.oldio
97 97 return val
98 98
99 99 def _client(self):
100 100 return 'remote:%s:%s:%s' % (
101 101 self.req.env.get('wsgi.url_scheme') or 'http',
102 102 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
103 103 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
104 104
105 def responsetype(self, v1compressible=False):
105 def responsetype(self, prefer_uncompressed):
106 106 """Determine the appropriate response type and compression settings.
107 107
108 The ``v1compressible`` argument states whether the response with
109 application/mercurial-0.1 media types should be zlib compressed.
110
111 108 Returns a tuple of (mediatype, compengine, engineopts).
112 109 """
113 # For now, if it isn't compressible in the old world, it's never
114 # compressible. We can change this to send uncompressed 0.2 payloads
115 # later.
116 if not v1compressible:
117 return HGTYPE, None, None
118
119 110 # Determine the response media type and compression engine based
120 111 # on the request parameters.
121 112 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
122 113
123 114 if '0.2' in protocaps:
115 # All clients are expected to support uncompressed data.
116 if prefer_uncompressed:
117 return HGTYPE2, util._noopengine(), {}
118
124 119 # Default as defined by wire protocol spec.
125 120 compformats = ['zlib', 'none']
126 121 for cap in protocaps:
127 122 if cap.startswith('comp='):
128 123 compformats = cap[5:].split(',')
129 124 break
130 125
131 126 # Now find an agreed upon compression format.
132 127 for engine in wireproto.supportedcompengines(self.ui, self,
133 128 util.SERVERROLE):
134 129 if engine.wireprotosupport().name in compformats:
135 130 opts = {}
136 131 level = self.ui.configint('server',
137 132 '%slevel' % engine.name())
138 133 if level is not None:
139 134 opts['level'] = level
140 135
141 136 return HGTYPE2, engine, opts
142 137
143 138 # No mutually supported compression format. Fall back to the
144 139 # legacy protocol.
145 140
146 141 # Don't allow untrusted settings because disabling compression or
147 142 # setting a very high compression level could lead to flooding
148 143 # the server's network or CPU.
149 144 opts = {'level': self.ui.configint('server', 'zliblevel')}
150 145 return HGTYPE, util.compengines['zlib'], opts
151 146
152 147 def iscmd(cmd):
153 148 return cmd in wireproto.commands
154 149
155 150 def call(repo, req, cmd):
156 151 p = webproto(req, repo.ui)
157 152
158 def genversion2(gen, compress, engine, engineopts):
153 def genversion2(gen, engine, engineopts):
159 154 # application/mercurial-0.2 always sends a payload header
160 155 # identifying the compression engine.
161 156 name = engine.wireprotosupport().name
162 157 assert 0 < len(name) < 256
163 158 yield struct.pack('B', len(name))
164 159 yield name
165 160
166 if compress:
167 for chunk in engine.compressstream(gen, opts=engineopts):
168 yield chunk
169 else:
170 161 for chunk in gen:
171 162 yield chunk
172 163
173 164 rsp = wireproto.dispatch(repo, p, cmd)
174 165 if isinstance(rsp, bytes):
175 166 req.respond(HTTP_OK, HGTYPE, body=rsp)
176 167 return []
168 elif isinstance(rsp, wireproto.streamres_legacy):
169 gen = rsp.gen
170 req.respond(HTTP_OK, HGTYPE)
171 return gen
177 172 elif isinstance(rsp, wireproto.streamres):
178 173 gen = rsp.gen
179 174
180 175 # This code for compression should not be streamres specific. It
181 176 # is here because we only compress streamres at the moment.
182 mediatype, engine, engineopts = p.responsetype(rsp.v1compressible)
177 mediatype, engine, engineopts = p.responsetype(rsp.prefer_uncompressed)
178 gen = engine.compressstream(gen, engineopts)
183 179
184 if mediatype == HGTYPE and rsp.v1compressible:
185 gen = engine.compressstream(gen, engineopts)
186 elif mediatype == HGTYPE2:
187 gen = genversion2(gen, rsp.v1compressible, engine, engineopts)
180 if mediatype == HGTYPE2:
181 gen = genversion2(gen, engine, engineopts)
188 182
189 183 req.respond(HTTP_OK, mediatype)
190 184 return gen
191 185 elif isinstance(rsp, wireproto.pushres):
192 186 val = p.restore()
193 187 rsp = '%d\n%s' % (rsp.res, val)
194 188 req.respond(HTTP_OK, HGTYPE, body=rsp)
195 189 return []
196 190 elif isinstance(rsp, wireproto.pusherr):
197 191 # drain the incoming bundle
198 192 req.drain()
199 193 p.restore()
200 194 rsp = '0\n%s\n' % rsp.res
201 195 req.respond(HTTP_OK, HGTYPE, body=rsp)
202 196 return []
203 197 elif isinstance(rsp, wireproto.ooberror):
204 198 rsp = rsp.message
205 199 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
206 200 return []
207 201 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
@@ -1,130 +1,131
1 1 # sshserver.py - ssh protocol server support for mercurial
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import sys
12 12
13 13 from .i18n import _
14 14 from . import (
15 15 encoding,
16 16 error,
17 17 hook,
18 18 util,
19 19 wireproto,
20 20 )
21 21
22 22 class sshserver(wireproto.abstractserverproto):
23 23 def __init__(self, ui, repo):
24 24 self.ui = ui
25 25 self.repo = repo
26 26 self.lock = None
27 27 self.fin = ui.fin
28 28 self.fout = ui.fout
29 29 self.name = 'ssh'
30 30
31 31 hook.redirect(True)
32 32 ui.fout = repo.ui.fout = ui.ferr
33 33
34 34 # Prevent insertion/deletion of CRs
35 35 util.setbinary(self.fin)
36 36 util.setbinary(self.fout)
37 37
38 38 def getargs(self, args):
39 39 data = {}
40 40 keys = args.split()
41 41 for n in xrange(len(keys)):
42 42 argline = self.fin.readline()[:-1]
43 43 arg, l = argline.split()
44 44 if arg not in keys:
45 45 raise error.Abort(_("unexpected parameter %r") % arg)
46 46 if arg == '*':
47 47 star = {}
48 48 for k in xrange(int(l)):
49 49 argline = self.fin.readline()[:-1]
50 50 arg, l = argline.split()
51 51 val = self.fin.read(int(l))
52 52 star[arg] = val
53 53 data['*'] = star
54 54 else:
55 55 val = self.fin.read(int(l))
56 56 data[arg] = val
57 57 return [data[k] for k in keys]
58 58
59 59 def getarg(self, name):
60 60 return self.getargs(name)[0]
61 61
62 62 def getfile(self, fpout):
63 63 self.sendresponse('')
64 64 count = int(self.fin.readline())
65 65 while count:
66 66 fpout.write(self.fin.read(count))
67 67 count = int(self.fin.readline())
68 68
69 69 def redirect(self):
70 70 pass
71 71
72 72 def sendresponse(self, v):
73 73 self.fout.write("%d\n" % len(v))
74 74 self.fout.write(v)
75 75 self.fout.flush()
76 76
77 77 def sendstream(self, source):
78 78 write = self.fout.write
79 79 for chunk in source.gen:
80 80 write(chunk)
81 81 self.fout.flush()
82 82
83 83 def sendpushresponse(self, rsp):
84 84 self.sendresponse('')
85 85 self.sendresponse(str(rsp.res))
86 86
87 87 def sendpusherror(self, rsp):
88 88 self.sendresponse(rsp.res)
89 89
90 90 def sendooberror(self, rsp):
91 91 self.ui.ferr.write('%s\n-\n' % rsp.message)
92 92 self.ui.ferr.flush()
93 93 self.fout.write('\n')
94 94 self.fout.flush()
95 95
96 96 def serve_forever(self):
97 97 try:
98 98 while self.serve_one():
99 99 pass
100 100 finally:
101 101 if self.lock is not None:
102 102 self.lock.release()
103 103 sys.exit(0)
104 104
105 105 handlers = {
106 106 str: sendresponse,
107 107 wireproto.streamres: sendstream,
108 wireproto.streamres_legacy: sendstream,
108 109 wireproto.pushres: sendpushresponse,
109 110 wireproto.pusherr: sendpusherror,
110 111 wireproto.ooberror: sendooberror,
111 112 }
112 113
113 114 def serve_one(self):
114 115 cmd = self.fin.readline()[:-1]
115 116 if cmd and cmd in wireproto.commands:
116 117 rsp = wireproto.dispatch(self.repo, self, cmd)
117 118 self.handlers[rsp.__class__](self, rsp)
118 119 elif cmd:
119 120 impl = getattr(self, 'do_' + cmd, None)
120 121 if impl:
121 122 r = impl()
122 123 if r is not None:
123 124 self.sendresponse(r)
124 125 else:
125 126 self.sendresponse("")
126 127 return cmd != ''
127 128
128 129 def _client(self):
129 130 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
130 131 return 'remote:ssh:' + client
@@ -1,1057 +1,1068
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class abstractserverproto(object):
45 45 """abstract class that summarizes the protocol API
46 46
47 47 Used as reference and documentation.
48 48 """
49 49
50 50 def getargs(self, args):
51 51 """return the value for arguments in <args>
52 52
53 53 returns a list of values (same order as <args>)"""
54 54 raise NotImplementedError()
55 55
56 56 def getfile(self, fp):
57 57 """write the whole content of a file into a file like object
58 58
59 59 The file is in the form::
60 60
61 61 (<chunk-size>\n<chunk>)+0\n
62 62
63 63 chunk size is the ascii version of the int.
64 64 """
65 65 raise NotImplementedError()
66 66
67 67 def redirect(self):
68 68 """may setup interception for stdout and stderr
69 69
70 70 See also the `restore` method."""
71 71 raise NotImplementedError()
72 72
73 73 # If the `redirect` function does install interception, the `restore`
74 74 # function MUST be defined. If interception is not used, this function
75 75 # MUST NOT be defined.
76 76 #
77 77 # left commented here on purpose
78 78 #
79 79 #def restore(self):
80 80 # """reinstall previous stdout and stderr and return intercepted stdout
81 81 # """
82 82 # raise NotImplementedError()
83 83
84 84 class remoteiterbatcher(peer.iterbatcher):
85 85 def __init__(self, remote):
86 86 super(remoteiterbatcher, self).__init__()
87 87 self._remote = remote
88 88
89 89 def __getattr__(self, name):
90 90 # Validate this method is batchable, since submit() only supports
91 91 # batchable methods.
92 92 fn = getattr(self._remote, name)
93 93 if not getattr(fn, 'batchable', None):
94 94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 95 'call to %r' % name)
96 96
97 97 return super(remoteiterbatcher, self).__getattr__(name)
98 98
99 99 def submit(self):
100 100 """Break the batch request into many patch calls and pipeline them.
101 101
102 102 This is mostly valuable over http where request sizes can be
103 103 limited, but can be used in other places as well.
104 104 """
105 105 # 2-tuple of (command, arguments) that represents what will be
106 106 # sent over the wire.
107 107 requests = []
108 108
109 109 # 4-tuple of (command, final future, @batchable generator, remote
110 110 # future).
111 111 results = []
112 112
113 113 for command, args, opts, finalfuture in self.calls:
114 114 mtd = getattr(self._remote, command)
115 115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116 116
117 117 commandargs, fremote = next(batchable)
118 118 assert fremote
119 119 requests.append((command, commandargs))
120 120 results.append((command, finalfuture, batchable, fremote))
121 121
122 122 if requests:
123 123 self._resultiter = self._remote._submitbatch(requests)
124 124
125 125 self._results = results
126 126
127 127 def results(self):
128 128 for command, finalfuture, batchable, remotefuture in self._results:
129 129 # Get the raw result, set it in the remote future, feed it
130 130 # back into the @batchable generator so it can be decoded, and
131 131 # set the result on the final future to this value.
132 132 remoteresult = next(self._resultiter)
133 133 remotefuture.set(remoteresult)
134 134 finalfuture.set(next(batchable))
135 135
136 136 # Verify our @batchable generators only emit 2 values.
137 137 try:
138 138 next(batchable)
139 139 except StopIteration:
140 140 pass
141 141 else:
142 142 raise error.ProgrammingError('%s @batchable generator emitted '
143 143 'unexpected value count' % command)
144 144
145 145 yield finalfuture.value
146 146
147 147 # Forward a couple of names from peer to make wireproto interactions
148 148 # slightly more sensible.
149 149 batchable = peer.batchable
150 150 future = peer.future
151 151
152 152 # list of nodes encoding / decoding
153 153
154 154 def decodelist(l, sep=' '):
155 155 if l:
156 156 return [bin(v) for v in l.split(sep)]
157 157 return []
158 158
159 159 def encodelist(l, sep=' '):
160 160 try:
161 161 return sep.join(map(hex, l))
162 162 except TypeError:
163 163 raise
164 164
165 165 # batched call argument encoding
166 166
167 167 def escapearg(plain):
168 168 return (plain
169 169 .replace(':', ':c')
170 170 .replace(',', ':o')
171 171 .replace(';', ':s')
172 172 .replace('=', ':e'))
173 173
174 174 def unescapearg(escaped):
175 175 return (escaped
176 176 .replace(':e', '=')
177 177 .replace(':s', ';')
178 178 .replace(':o', ',')
179 179 .replace(':c', ':'))
180 180
181 181 def encodebatchcmds(req):
182 182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 183 cmds = []
184 184 for op, argsdict in req:
185 185 # Old servers didn't properly unescape argument names. So prevent
186 186 # the sending of argument names that may not be decoded properly by
187 187 # servers.
188 188 assert all(escapearg(k) == k for k in argsdict)
189 189
190 190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 191 for k, v in argsdict.iteritems())
192 192 cmds.append('%s %s' % (op, args))
193 193
194 194 return ';'.join(cmds)
195 195
196 196 # mapping of options accepted by getbundle and their types
197 197 #
198 198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 199 # such options are properly processed in exchange.getbundle.
200 200 #
201 201 # supported types are:
202 202 #
203 203 # :nodes: list of binary nodes
204 204 # :csv: list of comma-separated values
205 205 # :scsv: list of comma-separated values return as set
206 206 # :plain: string with no transformation needed.
207 207 gboptsmap = {'heads': 'nodes',
208 208 'bookmarks': 'boolean',
209 209 'common': 'nodes',
210 210 'obsmarkers': 'boolean',
211 211 'phases': 'boolean',
212 212 'bundlecaps': 'scsv',
213 213 'listkeys': 'csv',
214 214 'cg': 'boolean',
215 215 'cbattempted': 'boolean'}
216 216
217 217 # client side
218 218
219 219 class wirepeer(repository.legacypeer):
220 220 """Client-side interface for communicating with a peer repository.
221 221
222 222 Methods commonly call wire protocol commands of the same name.
223 223
224 224 See also httppeer.py and sshpeer.py for protocol-specific
225 225 implementations of this interface.
226 226 """
227 227 # Begin of basewirepeer interface.
228 228
229 229 def iterbatch(self):
230 230 return remoteiterbatcher(self)
231 231
232 232 @batchable
233 233 def lookup(self, key):
234 234 self.requirecap('lookup', _('look up remote revision'))
235 235 f = future()
236 236 yield {'key': encoding.fromlocal(key)}, f
237 237 d = f.value
238 238 success, data = d[:-1].split(" ", 1)
239 239 if int(success):
240 240 yield bin(data)
241 241 else:
242 242 self._abort(error.RepoError(data))
243 243
244 244 @batchable
245 245 def heads(self):
246 246 f = future()
247 247 yield {}, f
248 248 d = f.value
249 249 try:
250 250 yield decodelist(d[:-1])
251 251 except ValueError:
252 252 self._abort(error.ResponseError(_("unexpected response:"), d))
253 253
254 254 @batchable
255 255 def known(self, nodes):
256 256 f = future()
257 257 yield {'nodes': encodelist(nodes)}, f
258 258 d = f.value
259 259 try:
260 260 yield [bool(int(b)) for b in d]
261 261 except ValueError:
262 262 self._abort(error.ResponseError(_("unexpected response:"), d))
263 263
264 264 @batchable
265 265 def branchmap(self):
266 266 f = future()
267 267 yield {}, f
268 268 d = f.value
269 269 try:
270 270 branchmap = {}
271 271 for branchpart in d.splitlines():
272 272 branchname, branchheads = branchpart.split(' ', 1)
273 273 branchname = encoding.tolocal(urlreq.unquote(branchname))
274 274 branchheads = decodelist(branchheads)
275 275 branchmap[branchname] = branchheads
276 276 yield branchmap
277 277 except TypeError:
278 278 self._abort(error.ResponseError(_("unexpected response:"), d))
279 279
280 280 @batchable
281 281 def listkeys(self, namespace):
282 282 if not self.capable('pushkey'):
283 283 yield {}, None
284 284 f = future()
285 285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
286 286 yield {'namespace': encoding.fromlocal(namespace)}, f
287 287 d = f.value
288 288 self.ui.debug('received listkey for "%s": %i bytes\n'
289 289 % (namespace, len(d)))
290 290 yield pushkeymod.decodekeys(d)
291 291
292 292 @batchable
293 293 def pushkey(self, namespace, key, old, new):
294 294 if not self.capable('pushkey'):
295 295 yield False, None
296 296 f = future()
297 297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 298 yield {'namespace': encoding.fromlocal(namespace),
299 299 'key': encoding.fromlocal(key),
300 300 'old': encoding.fromlocal(old),
301 301 'new': encoding.fromlocal(new)}, f
302 302 d = f.value
303 303 d, output = d.split('\n', 1)
304 304 try:
305 305 d = bool(int(d))
306 306 except ValueError:
307 307 raise error.ResponseError(
308 308 _('push failed (unexpected response):'), d)
309 309 for l in output.splitlines(True):
310 310 self.ui.status(_('remote: '), l)
311 311 yield d
312 312
313 313 def stream_out(self):
314 314 return self._callstream('stream_out')
315 315
316 316 def getbundle(self, source, **kwargs):
317 317 kwargs = pycompat.byteskwargs(kwargs)
318 318 self.requirecap('getbundle', _('look up remote changes'))
319 319 opts = {}
320 320 bundlecaps = kwargs.get('bundlecaps')
321 321 if bundlecaps is not None:
322 322 kwargs['bundlecaps'] = sorted(bundlecaps)
323 323 else:
324 324 bundlecaps = () # kwargs could have it to None
325 325 for key, value in kwargs.iteritems():
326 326 if value is None:
327 327 continue
328 328 keytype = gboptsmap.get(key)
329 329 if keytype is None:
330 330 raise error.ProgrammingError(
331 331 'Unexpectedly None keytype for key %s' % key)
332 332 elif keytype == 'nodes':
333 333 value = encodelist(value)
334 334 elif keytype in ('csv', 'scsv'):
335 335 value = ','.join(value)
336 336 elif keytype == 'boolean':
337 337 value = '%i' % bool(value)
338 338 elif keytype != 'plain':
339 339 raise KeyError('unknown getbundle option type %s'
340 340 % keytype)
341 341 opts[key] = value
342 342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
343 343 if any((cap.startswith('HG2') for cap in bundlecaps)):
344 344 return bundle2.getunbundler(self.ui, f)
345 345 else:
346 346 return changegroupmod.cg1unpacker(f, 'UN')
347 347
348 348 def unbundle(self, cg, heads, url):
349 349 '''Send cg (a readable file-like object representing the
350 350 changegroup to push, typically a chunkbuffer object) to the
351 351 remote server as a bundle.
352 352
353 353 When pushing a bundle10 stream, return an integer indicating the
354 354 result of the push (see changegroup.apply()).
355 355
356 356 When pushing a bundle20 stream, return a bundle20 stream.
357 357
358 358 `url` is the url the client thinks it's pushing to, which is
359 359 visible to hooks.
360 360 '''
361 361
362 362 if heads != ['force'] and self.capable('unbundlehash'):
363 363 heads = encodelist(['hashed',
364 364 hashlib.sha1(''.join(sorted(heads))).digest()])
365 365 else:
366 366 heads = encodelist(heads)
367 367
368 368 if util.safehasattr(cg, 'deltaheader'):
369 369 # this a bundle10, do the old style call sequence
370 370 ret, output = self._callpush("unbundle", cg, heads=heads)
371 371 if ret == "":
372 372 raise error.ResponseError(
373 373 _('push failed:'), output)
374 374 try:
375 375 ret = int(ret)
376 376 except ValueError:
377 377 raise error.ResponseError(
378 378 _('push failed (unexpected response):'), ret)
379 379
380 380 for l in output.splitlines(True):
381 381 self.ui.status(_('remote: '), l)
382 382 else:
383 383 # bundle2 push. Send a stream, fetch a stream.
384 384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
385 385 ret = bundle2.getunbundler(self.ui, stream)
386 386 return ret
387 387
388 388 # End of basewirepeer interface.
389 389
390 390 # Begin of baselegacywirepeer interface.
391 391
392 392 def branches(self, nodes):
393 393 n = encodelist(nodes)
394 394 d = self._call("branches", nodes=n)
395 395 try:
396 396 br = [tuple(decodelist(b)) for b in d.splitlines()]
397 397 return br
398 398 except ValueError:
399 399 self._abort(error.ResponseError(_("unexpected response:"), d))
400 400
401 401 def between(self, pairs):
402 402 batch = 8 # avoid giant requests
403 403 r = []
404 404 for i in xrange(0, len(pairs), batch):
405 405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
406 406 d = self._call("between", pairs=n)
407 407 try:
408 408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
409 409 except ValueError:
410 410 self._abort(error.ResponseError(_("unexpected response:"), d))
411 411 return r
412 412
413 413 def changegroup(self, nodes, kind):
414 414 n = encodelist(nodes)
415 415 f = self._callcompressable("changegroup", roots=n)
416 416 return changegroupmod.cg1unpacker(f, 'UN')
417 417
418 418 def changegroupsubset(self, bases, heads, kind):
419 419 self.requirecap('changegroupsubset', _('look up remote changes'))
420 420 bases = encodelist(bases)
421 421 heads = encodelist(heads)
422 422 f = self._callcompressable("changegroupsubset",
423 423 bases=bases, heads=heads)
424 424 return changegroupmod.cg1unpacker(f, 'UN')
425 425
426 426 # End of baselegacywirepeer interface.
427 427
428 428 def _submitbatch(self, req):
429 429 """run batch request <req> on the server
430 430
431 431 Returns an iterator of the raw responses from the server.
432 432 """
433 433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
434 434 chunk = rsp.read(1024)
435 435 work = [chunk]
436 436 while chunk:
437 437 while ';' not in chunk and chunk:
438 438 chunk = rsp.read(1024)
439 439 work.append(chunk)
440 440 merged = ''.join(work)
441 441 while ';' in merged:
442 442 one, merged = merged.split(';', 1)
443 443 yield unescapearg(one)
444 444 chunk = rsp.read(1024)
445 445 work = [merged, chunk]
446 446 yield unescapearg(''.join(work))
447 447
448 448 def _submitone(self, op, args):
449 449 return self._call(op, **pycompat.strkwargs(args))
450 450
451 451 def debugwireargs(self, one, two, three=None, four=None, five=None):
452 452 # don't pass optional arguments left at their default value
453 453 opts = {}
454 454 if three is not None:
455 455 opts[r'three'] = three
456 456 if four is not None:
457 457 opts[r'four'] = four
458 458 return self._call('debugwireargs', one=one, two=two, **opts)
459 459
460 460 def _call(self, cmd, **args):
461 461 """execute <cmd> on the server
462 462
463 463 The command is expected to return a simple string.
464 464
465 465 returns the server reply as a string."""
466 466 raise NotImplementedError()
467 467
468 468 def _callstream(self, cmd, **args):
469 469 """execute <cmd> on the server
470 470
471 471 The command is expected to return a stream. Note that if the
472 472 command doesn't return a stream, _callstream behaves
473 473 differently for ssh and http peers.
474 474
475 475 returns the server reply as a file like object.
476 476 """
477 477 raise NotImplementedError()
478 478
479 479 def _callcompressable(self, cmd, **args):
480 480 """execute <cmd> on the server
481 481
482 482 The command is expected to return a stream.
483 483
484 484 The stream may have been compressed in some implementations. This
485 485 function takes care of the decompression. This is the only difference
486 486 with _callstream.
487 487
488 488 returns the server reply as a file like object.
489 489 """
490 490 raise NotImplementedError()
491 491
492 492 def _callpush(self, cmd, fp, **args):
493 493 """execute a <cmd> on server
494 494
495 495 The command is expected to be related to a push. Push has a special
496 496 return method.
497 497
498 498 returns the server reply as a (ret, output) tuple. ret is either
499 499 empty (error) or a stringified int.
500 500 """
501 501 raise NotImplementedError()
502 502
503 503 def _calltwowaystream(self, cmd, fp, **args):
504 504 """execute <cmd> on server
505 505
506 506 The command will send a stream to the server and get a stream in reply.
507 507 """
508 508 raise NotImplementedError()
509 509
510 510 def _abort(self, exception):
511 511 """clearly abort the wire protocol connection and raise the exception
512 512 """
513 513 raise NotImplementedError()
514 514
515 515 # server side
516 516
517 517 # wire protocol command can either return a string or one of these classes.
518 518 class streamres(object):
519 519 """wireproto reply: binary stream
520 520
521 521 The call was successful and the result is a stream.
522 522
523 523 Accepts a generator containing chunks of data to be sent to the client.
524 524
525 ``v1compressible`` indicates whether this data can be compressed to
526 "version 1" clients (technically: HTTP peers using
527 application/mercurial-0.1 media type). This flag should NOT be used on
528 new commands because new clients should support a more modern compression
529 mechanism.
525 ``prefer_uncompressed`` indicates that the data is expected to be
526 uncompressable and that the stream should therefore use the ``none``
527 engine.
530 528 """
531 def __init__(self, gen=None, v1compressible=False):
529 def __init__(self, gen=None, prefer_uncompressed=False):
532 530 self.gen = gen
533 self.v1compressible = v1compressible
531 self.prefer_uncompressed = prefer_uncompressed
532
533 class streamres_legacy(object):
534 """wireproto reply: uncompressed binary stream
535
536 The call was successful and the result is a stream.
537
538 Accepts a generator containing chunks of data to be sent to the client.
539
540 Like ``streamres``, but sends an uncompressed data for "version 1" clients
541 using the application/mercurial-0.1 media type.
542 """
543 def __init__(self, gen=None):
544 self.gen = gen
534 545
535 546 class pushres(object):
536 547 """wireproto reply: success with simple integer return
537 548
538 549 The call was successful and returned an integer contained in `self.res`.
539 550 """
540 551 def __init__(self, res):
541 552 self.res = res
542 553
543 554 class pusherr(object):
544 555 """wireproto reply: failure
545 556
546 557 The call failed. The `self.res` attribute contains the error message.
547 558 """
548 559 def __init__(self, res):
549 560 self.res = res
550 561
551 562 class ooberror(object):
552 563 """wireproto reply: failure of a batch of operation
553 564
554 565 Something failed during a batch call. The error message is stored in
555 566 `self.message`.
556 567 """
557 568 def __init__(self, message):
558 569 self.message = message
559 570
560 571 def getdispatchrepo(repo, proto, command):
561 572 """Obtain the repo used for processing wire protocol commands.
562 573
563 574 The intent of this function is to serve as a monkeypatch point for
564 575 extensions that need commands to operate on different repo views under
565 576 specialized circumstances.
566 577 """
567 578 return repo.filtered('served')
568 579
569 580 def dispatch(repo, proto, command):
570 581 repo = getdispatchrepo(repo, proto, command)
571 582 func, spec = commands[command]
572 583 args = proto.getargs(spec)
573 584 return func(repo, proto, *args)
574 585
575 586 def options(cmd, keys, others):
576 587 opts = {}
577 588 for k in keys:
578 589 if k in others:
579 590 opts[k] = others[k]
580 591 del others[k]
581 592 if others:
582 593 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
583 594 % (cmd, ",".join(others)))
584 595 return opts
585 596
586 597 def bundle1allowed(repo, action):
587 598 """Whether a bundle1 operation is allowed from the server.
588 599
589 600 Priority is:
590 601
591 602 1. server.bundle1gd.<action> (if generaldelta active)
592 603 2. server.bundle1.<action>
593 604 3. server.bundle1gd (if generaldelta active)
594 605 4. server.bundle1
595 606 """
596 607 ui = repo.ui
597 608 gd = 'generaldelta' in repo.requirements
598 609
599 610 if gd:
600 611 v = ui.configbool('server', 'bundle1gd.%s' % action)
601 612 if v is not None:
602 613 return v
603 614
604 615 v = ui.configbool('server', 'bundle1.%s' % action)
605 616 if v is not None:
606 617 return v
607 618
608 619 if gd:
609 620 v = ui.configbool('server', 'bundle1gd')
610 621 if v is not None:
611 622 return v
612 623
613 624 return ui.configbool('server', 'bundle1')
614 625
615 626 def supportedcompengines(ui, proto, role):
616 627 """Obtain the list of supported compression engines for a request."""
617 628 assert role in (util.CLIENTROLE, util.SERVERROLE)
618 629
619 630 compengines = util.compengines.supportedwireengines(role)
620 631
621 632 # Allow config to override default list and ordering.
622 633 if role == util.SERVERROLE:
623 634 configengines = ui.configlist('server', 'compressionengines')
624 635 config = 'server.compressionengines'
625 636 else:
626 637 # This is currently implemented mainly to facilitate testing. In most
627 638 # cases, the server should be in charge of choosing a compression engine
628 639 # because a server has the most to lose from a sub-optimal choice. (e.g.
629 640 # CPU DoS due to an expensive engine or a network DoS due to poor
630 641 # compression ratio).
631 642 configengines = ui.configlist('experimental',
632 643 'clientcompressionengines')
633 644 config = 'experimental.clientcompressionengines'
634 645
635 646 # No explicit config. Filter out the ones that aren't supposed to be
636 647 # advertised and return default ordering.
637 648 if not configengines:
638 649 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
639 650 return [e for e in compengines
640 651 if getattr(e.wireprotosupport(), attr) > 0]
641 652
642 653 # If compression engines are listed in the config, assume there is a good
643 654 # reason for it (like server operators wanting to achieve specific
644 655 # performance characteristics). So fail fast if the config references
645 656 # unusable compression engines.
646 657 validnames = set(e.name() for e in compengines)
647 658 invalidnames = set(e for e in configengines if e not in validnames)
648 659 if invalidnames:
649 660 raise error.Abort(_('invalid compression engine defined in %s: %s') %
650 661 (config, ', '.join(sorted(invalidnames))))
651 662
652 663 compengines = [e for e in compengines if e.name() in configengines]
653 664 compengines = sorted(compengines,
654 665 key=lambda e: configengines.index(e.name()))
655 666
656 667 if not compengines:
657 668 raise error.Abort(_('%s config option does not specify any known '
658 669 'compression engines') % config,
659 670 hint=_('usable compression engines: %s') %
660 671 ', '.sorted(validnames))
661 672
662 673 return compengines
663 674
664 675 # list of commands
665 676 commands = {}
666 677
667 678 def wireprotocommand(name, args=''):
668 679 """decorator for wire protocol command"""
669 680 def register(func):
670 681 commands[name] = (func, args)
671 682 return func
672 683 return register
673 684
674 685 @wireprotocommand('batch', 'cmds *')
675 686 def batch(repo, proto, cmds, others):
676 687 repo = repo.filtered("served")
677 688 res = []
678 689 for pair in cmds.split(';'):
679 690 op, args = pair.split(' ', 1)
680 691 vals = {}
681 692 for a in args.split(','):
682 693 if a:
683 694 n, v = a.split('=')
684 695 vals[unescapearg(n)] = unescapearg(v)
685 696 func, spec = commands[op]
686 697 if spec:
687 698 keys = spec.split()
688 699 data = {}
689 700 for k in keys:
690 701 if k == '*':
691 702 star = {}
692 703 for key in vals.keys():
693 704 if key not in keys:
694 705 star[key] = vals[key]
695 706 data['*'] = star
696 707 else:
697 708 data[k] = vals[k]
698 709 result = func(repo, proto, *[data[k] for k in keys])
699 710 else:
700 711 result = func(repo, proto)
701 712 if isinstance(result, ooberror):
702 713 return result
703 714 res.append(escapearg(result))
704 715 return ';'.join(res)
705 716
706 717 @wireprotocommand('between', 'pairs')
707 718 def between(repo, proto, pairs):
708 719 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
709 720 r = []
710 721 for b in repo.between(pairs):
711 722 r.append(encodelist(b) + "\n")
712 723 return "".join(r)
713 724
714 725 @wireprotocommand('branchmap')
715 726 def branchmap(repo, proto):
716 727 branchmap = repo.branchmap()
717 728 heads = []
718 729 for branch, nodes in branchmap.iteritems():
719 730 branchname = urlreq.quote(encoding.fromlocal(branch))
720 731 branchnodes = encodelist(nodes)
721 732 heads.append('%s %s' % (branchname, branchnodes))
722 733 return '\n'.join(heads)
723 734
724 735 @wireprotocommand('branches', 'nodes')
725 736 def branches(repo, proto, nodes):
726 737 nodes = decodelist(nodes)
727 738 r = []
728 739 for b in repo.branches(nodes):
729 740 r.append(encodelist(b) + "\n")
730 741 return "".join(r)
731 742
732 743 @wireprotocommand('clonebundles', '')
733 744 def clonebundles(repo, proto):
734 745 """Server command for returning info for available bundles to seed clones.
735 746
736 747 Clients will parse this response and determine what bundle to fetch.
737 748
738 749 Extensions may wrap this command to filter or dynamically emit data
739 750 depending on the request. e.g. you could advertise URLs for the closest
740 751 data center given the client's IP address.
741 752 """
742 753 return repo.vfs.tryread('clonebundles.manifest')
743 754
744 755 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
745 756 'known', 'getbundle', 'unbundlehash', 'batch']
746 757
747 758 def _capabilities(repo, proto):
748 759 """return a list of capabilities for a repo
749 760
750 761 This function exists to allow extensions to easily wrap capabilities
751 762 computation
752 763
753 764 - returns a lists: easy to alter
754 765 - change done here will be propagated to both `capabilities` and `hello`
755 766 command without any other action needed.
756 767 """
757 768 # copy to prevent modification of the global list
758 769 caps = list(wireprotocaps)
759 770 if streamclone.allowservergeneration(repo):
760 771 if repo.ui.configbool('server', 'preferuncompressed'):
761 772 caps.append('stream-preferred')
762 773 requiredformats = repo.requirements & repo.supportedformats
763 774 # if our local revlogs are just revlogv1, add 'stream' cap
764 775 if not requiredformats - {'revlogv1'}:
765 776 caps.append('stream')
766 777 # otherwise, add 'streamreqs' detailing our local revlog format
767 778 else:
768 779 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
769 780 if repo.ui.configbool('experimental', 'bundle2-advertise'):
770 781 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
771 782 caps.append('bundle2=' + urlreq.quote(capsblob))
772 783 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
773 784
774 785 if proto.name == 'http':
775 786 caps.append('httpheader=%d' %
776 787 repo.ui.configint('server', 'maxhttpheaderlen'))
777 788 if repo.ui.configbool('experimental', 'httppostargs'):
778 789 caps.append('httppostargs')
779 790
780 791 # FUTURE advertise 0.2rx once support is implemented
781 792 # FUTURE advertise minrx and mintx after consulting config option
782 793 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
783 794
784 795 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
785 796 if compengines:
786 797 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
787 798 for e in compengines)
788 799 caps.append('compression=%s' % comptypes)
789 800
790 801 return caps
791 802
792 803 # If you are writing an extension and consider wrapping this function. Wrap
793 804 # `_capabilities` instead.
794 805 @wireprotocommand('capabilities')
795 806 def capabilities(repo, proto):
796 807 return ' '.join(_capabilities(repo, proto))
797 808
798 809 @wireprotocommand('changegroup', 'roots')
799 810 def changegroup(repo, proto, roots):
800 811 nodes = decodelist(roots)
801 812 outgoing = discovery.outgoing(repo, missingroots=nodes,
802 813 missingheads=repo.heads())
803 814 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
804 815 gen = iter(lambda: cg.read(32768), '')
805 return streamres(gen=gen, v1compressible=True)
816 return streamres(gen=gen)
806 817
807 818 @wireprotocommand('changegroupsubset', 'bases heads')
808 819 def changegroupsubset(repo, proto, bases, heads):
809 820 bases = decodelist(bases)
810 821 heads = decodelist(heads)
811 822 outgoing = discovery.outgoing(repo, missingroots=bases,
812 823 missingheads=heads)
813 824 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 825 gen = iter(lambda: cg.read(32768), '')
815 return streamres(gen=gen, v1compressible=True)
826 return streamres(gen=gen)
816 827
817 828 @wireprotocommand('debugwireargs', 'one two *')
818 829 def debugwireargs(repo, proto, one, two, others):
819 830 # only accept optional args from the known set
820 831 opts = options('debugwireargs', ['three', 'four'], others)
821 832 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
822 833
823 834 @wireprotocommand('getbundle', '*')
824 835 def getbundle(repo, proto, others):
825 836 opts = options('getbundle', gboptsmap.keys(), others)
826 837 for k, v in opts.iteritems():
827 838 keytype = gboptsmap[k]
828 839 if keytype == 'nodes':
829 840 opts[k] = decodelist(v)
830 841 elif keytype == 'csv':
831 842 opts[k] = list(v.split(','))
832 843 elif keytype == 'scsv':
833 844 opts[k] = set(v.split(','))
834 845 elif keytype == 'boolean':
835 846 # Client should serialize False as '0', which is a non-empty string
836 847 # so it evaluates as a True bool.
837 848 if v == '0':
838 849 opts[k] = False
839 850 else:
840 851 opts[k] = bool(v)
841 852 elif keytype != 'plain':
842 853 raise KeyError('unknown getbundle option type %s'
843 854 % keytype)
844 855
845 856 if not bundle1allowed(repo, 'pull'):
846 857 if not exchange.bundle2requested(opts.get('bundlecaps')):
847 858 if proto.name == 'http':
848 859 return ooberror(bundle2required)
849 860 raise error.Abort(bundle2requiredmain,
850 861 hint=bundle2requiredhint)
851 862
852 863 try:
853 864 if repo.ui.configbool('server', 'disablefullbundle'):
854 865 # Check to see if this is a full clone.
855 866 clheads = set(repo.changelog.heads())
856 867 heads = set(opts.get('heads', set()))
857 868 common = set(opts.get('common', set()))
858 869 common.discard(nullid)
859 870 if not common and clheads == heads:
860 871 raise error.Abort(
861 872 _('server has pull-based clones disabled'),
862 873 hint=_('remove --pull if specified or upgrade Mercurial'))
863 874
864 875 chunks = exchange.getbundlechunks(repo, 'serve',
865 876 **pycompat.strkwargs(opts))
866 877 except error.Abort as exc:
867 878 # cleanly forward Abort error to the client
868 879 if not exchange.bundle2requested(opts.get('bundlecaps')):
869 880 if proto.name == 'http':
870 881 return ooberror(str(exc) + '\n')
871 882 raise # cannot do better for bundle1 + ssh
872 883 # bundle2 request expect a bundle2 reply
873 884 bundler = bundle2.bundle20(repo.ui)
874 885 manargs = [('message', str(exc))]
875 886 advargs = []
876 887 if exc.hint is not None:
877 888 advargs.append(('hint', exc.hint))
878 889 bundler.addpart(bundle2.bundlepart('error:abort',
879 890 manargs, advargs))
880 return streamres(gen=bundler.getchunks(), v1compressible=True)
881 return streamres(gen=chunks, v1compressible=True)
891 return streamres(gen=bundler.getchunks())
892 return streamres(gen=chunks)
882 893
883 894 @wireprotocommand('heads')
884 895 def heads(repo, proto):
885 896 h = repo.heads()
886 897 return encodelist(h) + "\n"
887 898
888 899 @wireprotocommand('hello')
889 900 def hello(repo, proto):
890 901 '''the hello command returns a set of lines describing various
891 902 interesting things about the server, in an RFC822-like format.
892 903 Currently the only one defined is "capabilities", which
893 904 consists of a line in the form:
894 905
895 906 capabilities: space separated list of tokens
896 907 '''
897 908 return "capabilities: %s\n" % (capabilities(repo, proto))
898 909
899 910 @wireprotocommand('listkeys', 'namespace')
900 911 def listkeys(repo, proto, namespace):
901 912 d = repo.listkeys(encoding.tolocal(namespace)).items()
902 913 return pushkeymod.encodekeys(d)
903 914
904 915 @wireprotocommand('lookup', 'key')
905 916 def lookup(repo, proto, key):
906 917 try:
907 918 k = encoding.tolocal(key)
908 919 c = repo[k]
909 920 r = c.hex()
910 921 success = 1
911 922 except Exception as inst:
912 923 r = str(inst)
913 924 success = 0
914 925 return "%d %s\n" % (success, r)
915 926
916 927 @wireprotocommand('known', 'nodes *')
917 928 def known(repo, proto, nodes, others):
918 929 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
919 930
920 931 @wireprotocommand('pushkey', 'namespace key old new')
921 932 def pushkey(repo, proto, namespace, key, old, new):
922 933 # compatibility with pre-1.8 clients which were accidentally
923 934 # sending raw binary nodes rather than utf-8-encoded hex
924 935 if len(new) == 20 and util.escapestr(new) != new:
925 936 # looks like it could be a binary node
926 937 try:
927 938 new.decode('utf-8')
928 939 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
929 940 except UnicodeDecodeError:
930 941 pass # binary, leave unmodified
931 942 else:
932 943 new = encoding.tolocal(new) # normal path
933 944
934 945 if util.safehasattr(proto, 'restore'):
935 946
936 947 proto.redirect()
937 948
938 949 try:
939 950 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
940 951 encoding.tolocal(old), new) or False
941 952 except error.Abort:
942 953 r = False
943 954
944 955 output = proto.restore()
945 956
946 957 return '%s\n%s' % (int(r), output)
947 958
948 959 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
949 960 encoding.tolocal(old), new)
950 961 return '%s\n' % int(r)
951 962
952 963 @wireprotocommand('stream_out')
953 964 def stream(repo, proto):
954 965 '''If the server supports streaming clone, it advertises the "stream"
955 966 capability with a value representing the version and flags of the repo
956 967 it is serving. Client checks to see if it understands the format.
957 968 '''
958 return streamres(streamclone.generatev1wireproto(repo))
969 return streamres_legacy(streamclone.generatev1wireproto(repo))
959 970
960 971 @wireprotocommand('unbundle', 'heads')
961 972 def unbundle(repo, proto, heads):
962 973 their_heads = decodelist(heads)
963 974
964 975 try:
965 976 proto.redirect()
966 977
967 978 exchange.check_heads(repo, their_heads, 'preparing changes')
968 979
969 980 # write bundle data to temporary file because it can be big
970 981 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
971 982 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
972 983 r = 0
973 984 try:
974 985 proto.getfile(fp)
975 986 fp.seek(0)
976 987 gen = exchange.readbundle(repo.ui, fp, None)
977 988 if (isinstance(gen, changegroupmod.cg1unpacker)
978 989 and not bundle1allowed(repo, 'push')):
979 990 if proto.name == 'http':
980 991 # need to special case http because stderr do not get to
981 992 # the http client on failed push so we need to abuse some
982 993 # other error type to make sure the message get to the
983 994 # user.
984 995 return ooberror(bundle2required)
985 996 raise error.Abort(bundle2requiredmain,
986 997 hint=bundle2requiredhint)
987 998
988 999 r = exchange.unbundle(repo, gen, their_heads, 'serve',
989 1000 proto._client())
990 1001 if util.safehasattr(r, 'addpart'):
991 1002 # The return looks streamable, we are in the bundle2 case and
992 1003 # should return a stream.
993 return streamres(gen=r.getchunks())
1004 return streamres_legacy(gen=r.getchunks())
994 1005 return pushres(r)
995 1006
996 1007 finally:
997 1008 fp.close()
998 1009 os.unlink(tempname)
999 1010
1000 1011 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1001 1012 # handle non-bundle2 case first
1002 1013 if not getattr(exc, 'duringunbundle2', False):
1003 1014 try:
1004 1015 raise
1005 1016 except error.Abort:
1006 1017 # The old code we moved used util.stderr directly.
1007 1018 # We did not change it to minimise code change.
1008 1019 # This need to be moved to something proper.
1009 1020 # Feel free to do it.
1010 1021 util.stderr.write("abort: %s\n" % exc)
1011 1022 if exc.hint is not None:
1012 1023 util.stderr.write("(%s)\n" % exc.hint)
1013 1024 return pushres(0)
1014 1025 except error.PushRaced:
1015 1026 return pusherr(str(exc))
1016 1027
1017 1028 bundler = bundle2.bundle20(repo.ui)
1018 1029 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1019 1030 bundler.addpart(out)
1020 1031 try:
1021 1032 try:
1022 1033 raise
1023 1034 except error.PushkeyFailed as exc:
1024 1035 # check client caps
1025 1036 remotecaps = getattr(exc, '_replycaps', None)
1026 1037 if (remotecaps is not None
1027 1038 and 'pushkey' not in remotecaps.get('error', ())):
1028 1039 # no support remote side, fallback to Abort handler.
1029 1040 raise
1030 1041 part = bundler.newpart('error:pushkey')
1031 1042 part.addparam('in-reply-to', exc.partid)
1032 1043 if exc.namespace is not None:
1033 1044 part.addparam('namespace', exc.namespace, mandatory=False)
1034 1045 if exc.key is not None:
1035 1046 part.addparam('key', exc.key, mandatory=False)
1036 1047 if exc.new is not None:
1037 1048 part.addparam('new', exc.new, mandatory=False)
1038 1049 if exc.old is not None:
1039 1050 part.addparam('old', exc.old, mandatory=False)
1040 1051 if exc.ret is not None:
1041 1052 part.addparam('ret', exc.ret, mandatory=False)
1042 1053 except error.BundleValueError as exc:
1043 1054 errpart = bundler.newpart('error:unsupportedcontent')
1044 1055 if exc.parttype is not None:
1045 1056 errpart.addparam('parttype', exc.parttype)
1046 1057 if exc.params:
1047 1058 errpart.addparam('params', '\0'.join(exc.params))
1048 1059 except error.Abort as exc:
1049 1060 manargs = [('message', str(exc))]
1050 1061 advargs = []
1051 1062 if exc.hint is not None:
1052 1063 advargs.append(('hint', exc.hint))
1053 1064 bundler.addpart(bundle2.bundlepart('error:abort',
1054 1065 manargs, advargs))
1055 1066 except error.PushRaced as exc:
1056 1067 bundler.newpart('error:pushraced', [('message', str(exc))])
1057 return streamres(gen=bundler.getchunks())
1068 return streamres_legacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now