##// END OF EJS Templates
wireprotoserver: rename getfile() to forwardpayload() (API)...
Gregory Szorc -
r36087:90ca4986 default
parent child Browse files
Show More
@@ -1,189 +1,189 b''
1 # Copyright 2011 Fog Creek Software
1 # Copyright 2011 Fog Creek Software
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5 from __future__ import absolute_import
5 from __future__ import absolute_import
6
6
7 import os
7 import os
8 import re
8 import re
9
9
10 from mercurial.i18n import _
10 from mercurial.i18n import _
11
11
12 from mercurial import (
12 from mercurial import (
13 error,
13 error,
14 httppeer,
14 httppeer,
15 util,
15 util,
16 wireproto,
16 wireproto,
17 )
17 )
18
18
19 from . import (
19 from . import (
20 lfutil,
20 lfutil,
21 )
21 )
22
22
23 urlerr = util.urlerr
23 urlerr = util.urlerr
24 urlreq = util.urlreq
24 urlreq = util.urlreq
25
25
26 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
26 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
27 '\n\nPlease enable it in your Mercurial config '
27 '\n\nPlease enable it in your Mercurial config '
28 'file.\n')
28 'file.\n')
29
29
30 # these will all be replaced by largefiles.uisetup
30 # these will all be replaced by largefiles.uisetup
31 ssholdcallstream = None
31 ssholdcallstream = None
32 httpoldcallstream = None
32 httpoldcallstream = None
33
33
34 def putlfile(repo, proto, sha):
34 def putlfile(repo, proto, sha):
35 '''Server command for putting a largefile into a repository's local store
35 '''Server command for putting a largefile into a repository's local store
36 and into the user cache.'''
36 and into the user cache.'''
37 with proto.mayberedirectstdio() as output:
37 with proto.mayberedirectstdio() as output:
38 path = lfutil.storepath(repo, sha)
38 path = lfutil.storepath(repo, sha)
39 util.makedirs(os.path.dirname(path))
39 util.makedirs(os.path.dirname(path))
40 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
40 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
41
41
42 try:
42 try:
43 proto.getfile(tmpfp)
43 proto.forwardpayload(tmpfp)
44 tmpfp._fp.seek(0)
44 tmpfp._fp.seek(0)
45 if sha != lfutil.hexsha1(tmpfp._fp):
45 if sha != lfutil.hexsha1(tmpfp._fp):
46 raise IOError(0, _('largefile contents do not match hash'))
46 raise IOError(0, _('largefile contents do not match hash'))
47 tmpfp.close()
47 tmpfp.close()
48 lfutil.linktousercache(repo, sha)
48 lfutil.linktousercache(repo, sha)
49 except IOError as e:
49 except IOError as e:
50 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
50 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
51 (sha, e.strerror))
51 (sha, e.strerror))
52 return wireproto.pushres(1, output.getvalue() if output else '')
52 return wireproto.pushres(1, output.getvalue() if output else '')
53 finally:
53 finally:
54 tmpfp.discard()
54 tmpfp.discard()
55
55
56 return wireproto.pushres(0, output.getvalue() if output else '')
56 return wireproto.pushres(0, output.getvalue() if output else '')
57
57
58 def getlfile(repo, proto, sha):
58 def getlfile(repo, proto, sha):
59 '''Server command for retrieving a largefile from the repository-local
59 '''Server command for retrieving a largefile from the repository-local
60 cache or user cache.'''
60 cache or user cache.'''
61 filename = lfutil.findfile(repo, sha)
61 filename = lfutil.findfile(repo, sha)
62 if not filename:
62 if not filename:
63 raise error.Abort(_('requested largefile %s not present in cache')
63 raise error.Abort(_('requested largefile %s not present in cache')
64 % sha)
64 % sha)
65 f = open(filename, 'rb')
65 f = open(filename, 'rb')
66 length = os.fstat(f.fileno())[6]
66 length = os.fstat(f.fileno())[6]
67
67
68 # Since we can't set an HTTP content-length header here, and
68 # Since we can't set an HTTP content-length header here, and
69 # Mercurial core provides no way to give the length of a streamres
69 # Mercurial core provides no way to give the length of a streamres
70 # (and reading the entire file into RAM would be ill-advised), we
70 # (and reading the entire file into RAM would be ill-advised), we
71 # just send the length on the first line of the response, like the
71 # just send the length on the first line of the response, like the
72 # ssh proto does for string responses.
72 # ssh proto does for string responses.
73 def generator():
73 def generator():
74 yield '%d\n' % length
74 yield '%d\n' % length
75 for chunk in util.filechunkiter(f):
75 for chunk in util.filechunkiter(f):
76 yield chunk
76 yield chunk
77 return wireproto.streamres_legacy(gen=generator())
77 return wireproto.streamres_legacy(gen=generator())
78
78
79 def statlfile(repo, proto, sha):
79 def statlfile(repo, proto, sha):
80 '''Server command for checking if a largefile is present - returns '2\n' if
80 '''Server command for checking if a largefile is present - returns '2\n' if
81 the largefile is missing, '0\n' if it seems to be in good condition.
81 the largefile is missing, '0\n' if it seems to be in good condition.
82
82
83 The value 1 is reserved for mismatched checksum, but that is too expensive
83 The value 1 is reserved for mismatched checksum, but that is too expensive
84 to be verified on every stat and must be caught be running 'hg verify'
84 to be verified on every stat and must be caught be running 'hg verify'
85 server side.'''
85 server side.'''
86 filename = lfutil.findfile(repo, sha)
86 filename = lfutil.findfile(repo, sha)
87 if not filename:
87 if not filename:
88 return '2\n'
88 return '2\n'
89 return '0\n'
89 return '0\n'
90
90
91 def wirereposetup(ui, repo):
91 def wirereposetup(ui, repo):
92 class lfileswirerepository(repo.__class__):
92 class lfileswirerepository(repo.__class__):
93 def putlfile(self, sha, fd):
93 def putlfile(self, sha, fd):
94 # unfortunately, httprepository._callpush tries to convert its
94 # unfortunately, httprepository._callpush tries to convert its
95 # input file-like into a bundle before sending it, so we can't use
95 # input file-like into a bundle before sending it, so we can't use
96 # it ...
96 # it ...
97 if issubclass(self.__class__, httppeer.httppeer):
97 if issubclass(self.__class__, httppeer.httppeer):
98 res = self._call('putlfile', data=fd, sha=sha,
98 res = self._call('putlfile', data=fd, sha=sha,
99 headers={'content-type':'application/mercurial-0.1'})
99 headers={'content-type':'application/mercurial-0.1'})
100 try:
100 try:
101 d, output = res.split('\n', 1)
101 d, output = res.split('\n', 1)
102 for l in output.splitlines(True):
102 for l in output.splitlines(True):
103 self.ui.warn(_('remote: '), l) # assume l ends with \n
103 self.ui.warn(_('remote: '), l) # assume l ends with \n
104 return int(d)
104 return int(d)
105 except ValueError:
105 except ValueError:
106 self.ui.warn(_('unexpected putlfile response: %r\n') % res)
106 self.ui.warn(_('unexpected putlfile response: %r\n') % res)
107 return 1
107 return 1
108 # ... but we can't use sshrepository._call because the data=
108 # ... but we can't use sshrepository._call because the data=
109 # argument won't get sent, and _callpush does exactly what we want
109 # argument won't get sent, and _callpush does exactly what we want
110 # in this case: send the data straight through
110 # in this case: send the data straight through
111 else:
111 else:
112 try:
112 try:
113 ret, output = self._callpush("putlfile", fd, sha=sha)
113 ret, output = self._callpush("putlfile", fd, sha=sha)
114 if ret == "":
114 if ret == "":
115 raise error.ResponseError(_('putlfile failed:'),
115 raise error.ResponseError(_('putlfile failed:'),
116 output)
116 output)
117 return int(ret)
117 return int(ret)
118 except IOError:
118 except IOError:
119 return 1
119 return 1
120 except ValueError:
120 except ValueError:
121 raise error.ResponseError(
121 raise error.ResponseError(
122 _('putlfile failed (unexpected response):'), ret)
122 _('putlfile failed (unexpected response):'), ret)
123
123
124 def getlfile(self, sha):
124 def getlfile(self, sha):
125 """returns an iterable with the chunks of the file with sha sha"""
125 """returns an iterable with the chunks of the file with sha sha"""
126 stream = self._callstream("getlfile", sha=sha)
126 stream = self._callstream("getlfile", sha=sha)
127 length = stream.readline()
127 length = stream.readline()
128 try:
128 try:
129 length = int(length)
129 length = int(length)
130 except ValueError:
130 except ValueError:
131 self._abort(error.ResponseError(_("unexpected response:"),
131 self._abort(error.ResponseError(_("unexpected response:"),
132 length))
132 length))
133
133
134 # SSH streams will block if reading more than length
134 # SSH streams will block if reading more than length
135 for chunk in util.filechunkiter(stream, limit=length):
135 for chunk in util.filechunkiter(stream, limit=length):
136 yield chunk
136 yield chunk
137 # HTTP streams must hit the end to process the last empty
137 # HTTP streams must hit the end to process the last empty
138 # chunk of Chunked-Encoding so the connection can be reused.
138 # chunk of Chunked-Encoding so the connection can be reused.
139 if issubclass(self.__class__, httppeer.httppeer):
139 if issubclass(self.__class__, httppeer.httppeer):
140 chunk = stream.read(1)
140 chunk = stream.read(1)
141 if chunk:
141 if chunk:
142 self._abort(error.ResponseError(_("unexpected response:"),
142 self._abort(error.ResponseError(_("unexpected response:"),
143 chunk))
143 chunk))
144
144
145 @wireproto.batchable
145 @wireproto.batchable
146 def statlfile(self, sha):
146 def statlfile(self, sha):
147 f = wireproto.future()
147 f = wireproto.future()
148 result = {'sha': sha}
148 result = {'sha': sha}
149 yield result, f
149 yield result, f
150 try:
150 try:
151 yield int(f.value)
151 yield int(f.value)
152 except (ValueError, urlerr.httperror):
152 except (ValueError, urlerr.httperror):
153 # If the server returns anything but an integer followed by a
153 # If the server returns anything but an integer followed by a
154 # newline, newline, it's not speaking our language; if we get
154 # newline, newline, it's not speaking our language; if we get
155 # an HTTP error, we can't be sure the largefile is present;
155 # an HTTP error, we can't be sure the largefile is present;
156 # either way, consider it missing.
156 # either way, consider it missing.
157 yield 2
157 yield 2
158
158
159 repo.__class__ = lfileswirerepository
159 repo.__class__ = lfileswirerepository
160
160
161 # advertise the largefiles=serve capability
161 # advertise the largefiles=serve capability
162 def _capabilities(orig, repo, proto):
162 def _capabilities(orig, repo, proto):
163 '''announce largefile server capability'''
163 '''announce largefile server capability'''
164 caps = orig(repo, proto)
164 caps = orig(repo, proto)
165 caps.append('largefiles=serve')
165 caps.append('largefiles=serve')
166 return caps
166 return caps
167
167
168 def heads(repo, proto):
168 def heads(repo, proto):
169 '''Wrap server command - largefile capable clients will know to call
169 '''Wrap server command - largefile capable clients will know to call
170 lheads instead'''
170 lheads instead'''
171 if lfutil.islfilesrepo(repo):
171 if lfutil.islfilesrepo(repo):
172 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
172 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
173 return wireproto.heads(repo, proto)
173 return wireproto.heads(repo, proto)
174
174
175 def sshrepocallstream(self, cmd, **args):
175 def sshrepocallstream(self, cmd, **args):
176 if cmd == 'heads' and self.capable('largefiles'):
176 if cmd == 'heads' and self.capable('largefiles'):
177 cmd = 'lheads'
177 cmd = 'lheads'
178 if cmd == 'batch' and self.capable('largefiles'):
178 if cmd == 'batch' and self.capable('largefiles'):
179 args[r'cmds'] = args[r'cmds'].replace('heads ', 'lheads ')
179 args[r'cmds'] = args[r'cmds'].replace('heads ', 'lheads ')
180 return ssholdcallstream(self, cmd, **args)
180 return ssholdcallstream(self, cmd, **args)
181
181
182 headsre = re.compile(r'(^|;)heads\b')
182 headsre = re.compile(r'(^|;)heads\b')
183
183
184 def httprepocallstream(self, cmd, **args):
184 def httprepocallstream(self, cmd, **args):
185 if cmd == 'heads' and self.capable('largefiles'):
185 if cmd == 'heads' and self.capable('largefiles'):
186 cmd = 'lheads'
186 cmd = 'lheads'
187 if cmd == 'batch' and self.capable('largefiles'):
187 if cmd == 'batch' and self.capable('largefiles'):
188 args[r'cmds'] = headsre.sub('lheads', args[r'cmds'])
188 args[r'cmds'] = headsre.sub('lheads', args[r'cmds'])
189 return httpoldcallstream(self, cmd, **args)
189 return httpoldcallstream(self, cmd, **args)
@@ -1,1096 +1,1096 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 )
34 )
35
35
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 'IncompatibleClient')
41 'IncompatibleClient')
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43
43
44 class remoteiterbatcher(peer.iterbatcher):
44 class remoteiterbatcher(peer.iterbatcher):
45 def __init__(self, remote):
45 def __init__(self, remote):
46 super(remoteiterbatcher, self).__init__()
46 super(remoteiterbatcher, self).__init__()
47 self._remote = remote
47 self._remote = remote
48
48
49 def __getattr__(self, name):
49 def __getattr__(self, name):
50 # Validate this method is batchable, since submit() only supports
50 # Validate this method is batchable, since submit() only supports
51 # batchable methods.
51 # batchable methods.
52 fn = getattr(self._remote, name)
52 fn = getattr(self._remote, name)
53 if not getattr(fn, 'batchable', None):
53 if not getattr(fn, 'batchable', None):
54 raise error.ProgrammingError('Attempted to batch a non-batchable '
54 raise error.ProgrammingError('Attempted to batch a non-batchable '
55 'call to %r' % name)
55 'call to %r' % name)
56
56
57 return super(remoteiterbatcher, self).__getattr__(name)
57 return super(remoteiterbatcher, self).__getattr__(name)
58
58
59 def submit(self):
59 def submit(self):
60 """Break the batch request into many patch calls and pipeline them.
60 """Break the batch request into many patch calls and pipeline them.
61
61
62 This is mostly valuable over http where request sizes can be
62 This is mostly valuable over http where request sizes can be
63 limited, but can be used in other places as well.
63 limited, but can be used in other places as well.
64 """
64 """
65 # 2-tuple of (command, arguments) that represents what will be
65 # 2-tuple of (command, arguments) that represents what will be
66 # sent over the wire.
66 # sent over the wire.
67 requests = []
67 requests = []
68
68
69 # 4-tuple of (command, final future, @batchable generator, remote
69 # 4-tuple of (command, final future, @batchable generator, remote
70 # future).
70 # future).
71 results = []
71 results = []
72
72
73 for command, args, opts, finalfuture in self.calls:
73 for command, args, opts, finalfuture in self.calls:
74 mtd = getattr(self._remote, command)
74 mtd = getattr(self._remote, command)
75 batchable = mtd.batchable(mtd.__self__, *args, **opts)
75 batchable = mtd.batchable(mtd.__self__, *args, **opts)
76
76
77 commandargs, fremote = next(batchable)
77 commandargs, fremote = next(batchable)
78 assert fremote
78 assert fremote
79 requests.append((command, commandargs))
79 requests.append((command, commandargs))
80 results.append((command, finalfuture, batchable, fremote))
80 results.append((command, finalfuture, batchable, fremote))
81
81
82 if requests:
82 if requests:
83 self._resultiter = self._remote._submitbatch(requests)
83 self._resultiter = self._remote._submitbatch(requests)
84
84
85 self._results = results
85 self._results = results
86
86
87 def results(self):
87 def results(self):
88 for command, finalfuture, batchable, remotefuture in self._results:
88 for command, finalfuture, batchable, remotefuture in self._results:
89 # Get the raw result, set it in the remote future, feed it
89 # Get the raw result, set it in the remote future, feed it
90 # back into the @batchable generator so it can be decoded, and
90 # back into the @batchable generator so it can be decoded, and
91 # set the result on the final future to this value.
91 # set the result on the final future to this value.
92 remoteresult = next(self._resultiter)
92 remoteresult = next(self._resultiter)
93 remotefuture.set(remoteresult)
93 remotefuture.set(remoteresult)
94 finalfuture.set(next(batchable))
94 finalfuture.set(next(batchable))
95
95
96 # Verify our @batchable generators only emit 2 values.
96 # Verify our @batchable generators only emit 2 values.
97 try:
97 try:
98 next(batchable)
98 next(batchable)
99 except StopIteration:
99 except StopIteration:
100 pass
100 pass
101 else:
101 else:
102 raise error.ProgrammingError('%s @batchable generator emitted '
102 raise error.ProgrammingError('%s @batchable generator emitted '
103 'unexpected value count' % command)
103 'unexpected value count' % command)
104
104
105 yield finalfuture.value
105 yield finalfuture.value
106
106
107 # Forward a couple of names from peer to make wireproto interactions
107 # Forward a couple of names from peer to make wireproto interactions
108 # slightly more sensible.
108 # slightly more sensible.
109 batchable = peer.batchable
109 batchable = peer.batchable
110 future = peer.future
110 future = peer.future
111
111
112 # list of nodes encoding / decoding
112 # list of nodes encoding / decoding
113
113
114 def decodelist(l, sep=' '):
114 def decodelist(l, sep=' '):
115 if l:
115 if l:
116 return [bin(v) for v in l.split(sep)]
116 return [bin(v) for v in l.split(sep)]
117 return []
117 return []
118
118
119 def encodelist(l, sep=' '):
119 def encodelist(l, sep=' '):
120 try:
120 try:
121 return sep.join(map(hex, l))
121 return sep.join(map(hex, l))
122 except TypeError:
122 except TypeError:
123 raise
123 raise
124
124
125 # batched call argument encoding
125 # batched call argument encoding
126
126
127 def escapearg(plain):
127 def escapearg(plain):
128 return (plain
128 return (plain
129 .replace(':', ':c')
129 .replace(':', ':c')
130 .replace(',', ':o')
130 .replace(',', ':o')
131 .replace(';', ':s')
131 .replace(';', ':s')
132 .replace('=', ':e'))
132 .replace('=', ':e'))
133
133
134 def unescapearg(escaped):
134 def unescapearg(escaped):
135 return (escaped
135 return (escaped
136 .replace(':e', '=')
136 .replace(':e', '=')
137 .replace(':s', ';')
137 .replace(':s', ';')
138 .replace(':o', ',')
138 .replace(':o', ',')
139 .replace(':c', ':'))
139 .replace(':c', ':'))
140
140
141 def encodebatchcmds(req):
141 def encodebatchcmds(req):
142 """Return a ``cmds`` argument value for the ``batch`` command."""
142 """Return a ``cmds`` argument value for the ``batch`` command."""
143 cmds = []
143 cmds = []
144 for op, argsdict in req:
144 for op, argsdict in req:
145 # Old servers didn't properly unescape argument names. So prevent
145 # Old servers didn't properly unescape argument names. So prevent
146 # the sending of argument names that may not be decoded properly by
146 # the sending of argument names that may not be decoded properly by
147 # servers.
147 # servers.
148 assert all(escapearg(k) == k for k in argsdict)
148 assert all(escapearg(k) == k for k in argsdict)
149
149
150 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
150 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
151 for k, v in argsdict.iteritems())
151 for k, v in argsdict.iteritems())
152 cmds.append('%s %s' % (op, args))
152 cmds.append('%s %s' % (op, args))
153
153
154 return ';'.join(cmds)
154 return ';'.join(cmds)
155
155
156 # mapping of options accepted by getbundle and their types
156 # mapping of options accepted by getbundle and their types
157 #
157 #
158 # Meant to be extended by extensions. It is extensions responsibility to ensure
158 # Meant to be extended by extensions. It is extensions responsibility to ensure
159 # such options are properly processed in exchange.getbundle.
159 # such options are properly processed in exchange.getbundle.
160 #
160 #
161 # supported types are:
161 # supported types are:
162 #
162 #
163 # :nodes: list of binary nodes
163 # :nodes: list of binary nodes
164 # :csv: list of comma-separated values
164 # :csv: list of comma-separated values
165 # :scsv: list of comma-separated values return as set
165 # :scsv: list of comma-separated values return as set
166 # :plain: string with no transformation needed.
166 # :plain: string with no transformation needed.
167 gboptsmap = {'heads': 'nodes',
167 gboptsmap = {'heads': 'nodes',
168 'bookmarks': 'boolean',
168 'bookmarks': 'boolean',
169 'common': 'nodes',
169 'common': 'nodes',
170 'obsmarkers': 'boolean',
170 'obsmarkers': 'boolean',
171 'phases': 'boolean',
171 'phases': 'boolean',
172 'bundlecaps': 'scsv',
172 'bundlecaps': 'scsv',
173 'listkeys': 'csv',
173 'listkeys': 'csv',
174 'cg': 'boolean',
174 'cg': 'boolean',
175 'cbattempted': 'boolean',
175 'cbattempted': 'boolean',
176 'stream': 'boolean',
176 'stream': 'boolean',
177 }
177 }
178
178
179 # client side
179 # client side
180
180
181 class wirepeer(repository.legacypeer):
181 class wirepeer(repository.legacypeer):
182 """Client-side interface for communicating with a peer repository.
182 """Client-side interface for communicating with a peer repository.
183
183
184 Methods commonly call wire protocol commands of the same name.
184 Methods commonly call wire protocol commands of the same name.
185
185
186 See also httppeer.py and sshpeer.py for protocol-specific
186 See also httppeer.py and sshpeer.py for protocol-specific
187 implementations of this interface.
187 implementations of this interface.
188 """
188 """
189 # Begin of basewirepeer interface.
189 # Begin of basewirepeer interface.
190
190
191 def iterbatch(self):
191 def iterbatch(self):
192 return remoteiterbatcher(self)
192 return remoteiterbatcher(self)
193
193
194 @batchable
194 @batchable
195 def lookup(self, key):
195 def lookup(self, key):
196 self.requirecap('lookup', _('look up remote revision'))
196 self.requirecap('lookup', _('look up remote revision'))
197 f = future()
197 f = future()
198 yield {'key': encoding.fromlocal(key)}, f
198 yield {'key': encoding.fromlocal(key)}, f
199 d = f.value
199 d = f.value
200 success, data = d[:-1].split(" ", 1)
200 success, data = d[:-1].split(" ", 1)
201 if int(success):
201 if int(success):
202 yield bin(data)
202 yield bin(data)
203 else:
203 else:
204 self._abort(error.RepoError(data))
204 self._abort(error.RepoError(data))
205
205
206 @batchable
206 @batchable
207 def heads(self):
207 def heads(self):
208 f = future()
208 f = future()
209 yield {}, f
209 yield {}, f
210 d = f.value
210 d = f.value
211 try:
211 try:
212 yield decodelist(d[:-1])
212 yield decodelist(d[:-1])
213 except ValueError:
213 except ValueError:
214 self._abort(error.ResponseError(_("unexpected response:"), d))
214 self._abort(error.ResponseError(_("unexpected response:"), d))
215
215
216 @batchable
216 @batchable
217 def known(self, nodes):
217 def known(self, nodes):
218 f = future()
218 f = future()
219 yield {'nodes': encodelist(nodes)}, f
219 yield {'nodes': encodelist(nodes)}, f
220 d = f.value
220 d = f.value
221 try:
221 try:
222 yield [bool(int(b)) for b in d]
222 yield [bool(int(b)) for b in d]
223 except ValueError:
223 except ValueError:
224 self._abort(error.ResponseError(_("unexpected response:"), d))
224 self._abort(error.ResponseError(_("unexpected response:"), d))
225
225
226 @batchable
226 @batchable
227 def branchmap(self):
227 def branchmap(self):
228 f = future()
228 f = future()
229 yield {}, f
229 yield {}, f
230 d = f.value
230 d = f.value
231 try:
231 try:
232 branchmap = {}
232 branchmap = {}
233 for branchpart in d.splitlines():
233 for branchpart in d.splitlines():
234 branchname, branchheads = branchpart.split(' ', 1)
234 branchname, branchheads = branchpart.split(' ', 1)
235 branchname = encoding.tolocal(urlreq.unquote(branchname))
235 branchname = encoding.tolocal(urlreq.unquote(branchname))
236 branchheads = decodelist(branchheads)
236 branchheads = decodelist(branchheads)
237 branchmap[branchname] = branchheads
237 branchmap[branchname] = branchheads
238 yield branchmap
238 yield branchmap
239 except TypeError:
239 except TypeError:
240 self._abort(error.ResponseError(_("unexpected response:"), d))
240 self._abort(error.ResponseError(_("unexpected response:"), d))
241
241
242 @batchable
242 @batchable
243 def listkeys(self, namespace):
243 def listkeys(self, namespace):
244 if not self.capable('pushkey'):
244 if not self.capable('pushkey'):
245 yield {}, None
245 yield {}, None
246 f = future()
246 f = future()
247 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
247 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
248 yield {'namespace': encoding.fromlocal(namespace)}, f
248 yield {'namespace': encoding.fromlocal(namespace)}, f
249 d = f.value
249 d = f.value
250 self.ui.debug('received listkey for "%s": %i bytes\n'
250 self.ui.debug('received listkey for "%s": %i bytes\n'
251 % (namespace, len(d)))
251 % (namespace, len(d)))
252 yield pushkeymod.decodekeys(d)
252 yield pushkeymod.decodekeys(d)
253
253
254 @batchable
254 @batchable
255 def pushkey(self, namespace, key, old, new):
255 def pushkey(self, namespace, key, old, new):
256 if not self.capable('pushkey'):
256 if not self.capable('pushkey'):
257 yield False, None
257 yield False, None
258 f = future()
258 f = future()
259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
260 yield {'namespace': encoding.fromlocal(namespace),
260 yield {'namespace': encoding.fromlocal(namespace),
261 'key': encoding.fromlocal(key),
261 'key': encoding.fromlocal(key),
262 'old': encoding.fromlocal(old),
262 'old': encoding.fromlocal(old),
263 'new': encoding.fromlocal(new)}, f
263 'new': encoding.fromlocal(new)}, f
264 d = f.value
264 d = f.value
265 d, output = d.split('\n', 1)
265 d, output = d.split('\n', 1)
266 try:
266 try:
267 d = bool(int(d))
267 d = bool(int(d))
268 except ValueError:
268 except ValueError:
269 raise error.ResponseError(
269 raise error.ResponseError(
270 _('push failed (unexpected response):'), d)
270 _('push failed (unexpected response):'), d)
271 for l in output.splitlines(True):
271 for l in output.splitlines(True):
272 self.ui.status(_('remote: '), l)
272 self.ui.status(_('remote: '), l)
273 yield d
273 yield d
274
274
275 def stream_out(self):
275 def stream_out(self):
276 return self._callstream('stream_out')
276 return self._callstream('stream_out')
277
277
278 def getbundle(self, source, **kwargs):
278 def getbundle(self, source, **kwargs):
279 kwargs = pycompat.byteskwargs(kwargs)
279 kwargs = pycompat.byteskwargs(kwargs)
280 self.requirecap('getbundle', _('look up remote changes'))
280 self.requirecap('getbundle', _('look up remote changes'))
281 opts = {}
281 opts = {}
282 bundlecaps = kwargs.get('bundlecaps')
282 bundlecaps = kwargs.get('bundlecaps')
283 if bundlecaps is not None:
283 if bundlecaps is not None:
284 kwargs['bundlecaps'] = sorted(bundlecaps)
284 kwargs['bundlecaps'] = sorted(bundlecaps)
285 else:
285 else:
286 bundlecaps = () # kwargs could have it to None
286 bundlecaps = () # kwargs could have it to None
287 for key, value in kwargs.iteritems():
287 for key, value in kwargs.iteritems():
288 if value is None:
288 if value is None:
289 continue
289 continue
290 keytype = gboptsmap.get(key)
290 keytype = gboptsmap.get(key)
291 if keytype is None:
291 if keytype is None:
292 raise error.ProgrammingError(
292 raise error.ProgrammingError(
293 'Unexpectedly None keytype for key %s' % key)
293 'Unexpectedly None keytype for key %s' % key)
294 elif keytype == 'nodes':
294 elif keytype == 'nodes':
295 value = encodelist(value)
295 value = encodelist(value)
296 elif keytype in ('csv', 'scsv'):
296 elif keytype in ('csv', 'scsv'):
297 value = ','.join(value)
297 value = ','.join(value)
298 elif keytype == 'boolean':
298 elif keytype == 'boolean':
299 value = '%i' % bool(value)
299 value = '%i' % bool(value)
300 elif keytype != 'plain':
300 elif keytype != 'plain':
301 raise KeyError('unknown getbundle option type %s'
301 raise KeyError('unknown getbundle option type %s'
302 % keytype)
302 % keytype)
303 opts[key] = value
303 opts[key] = value
304 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
304 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
305 if any((cap.startswith('HG2') for cap in bundlecaps)):
305 if any((cap.startswith('HG2') for cap in bundlecaps)):
306 return bundle2.getunbundler(self.ui, f)
306 return bundle2.getunbundler(self.ui, f)
307 else:
307 else:
308 return changegroupmod.cg1unpacker(f, 'UN')
308 return changegroupmod.cg1unpacker(f, 'UN')
309
309
310 def unbundle(self, cg, heads, url):
310 def unbundle(self, cg, heads, url):
311 '''Send cg (a readable file-like object representing the
311 '''Send cg (a readable file-like object representing the
312 changegroup to push, typically a chunkbuffer object) to the
312 changegroup to push, typically a chunkbuffer object) to the
313 remote server as a bundle.
313 remote server as a bundle.
314
314
315 When pushing a bundle10 stream, return an integer indicating the
315 When pushing a bundle10 stream, return an integer indicating the
316 result of the push (see changegroup.apply()).
316 result of the push (see changegroup.apply()).
317
317
318 When pushing a bundle20 stream, return a bundle20 stream.
318 When pushing a bundle20 stream, return a bundle20 stream.
319
319
320 `url` is the url the client thinks it's pushing to, which is
320 `url` is the url the client thinks it's pushing to, which is
321 visible to hooks.
321 visible to hooks.
322 '''
322 '''
323
323
324 if heads != ['force'] and self.capable('unbundlehash'):
324 if heads != ['force'] and self.capable('unbundlehash'):
325 heads = encodelist(['hashed',
325 heads = encodelist(['hashed',
326 hashlib.sha1(''.join(sorted(heads))).digest()])
326 hashlib.sha1(''.join(sorted(heads))).digest()])
327 else:
327 else:
328 heads = encodelist(heads)
328 heads = encodelist(heads)
329
329
330 if util.safehasattr(cg, 'deltaheader'):
330 if util.safehasattr(cg, 'deltaheader'):
331 # this a bundle10, do the old style call sequence
331 # this a bundle10, do the old style call sequence
332 ret, output = self._callpush("unbundle", cg, heads=heads)
332 ret, output = self._callpush("unbundle", cg, heads=heads)
333 if ret == "":
333 if ret == "":
334 raise error.ResponseError(
334 raise error.ResponseError(
335 _('push failed:'), output)
335 _('push failed:'), output)
336 try:
336 try:
337 ret = int(ret)
337 ret = int(ret)
338 except ValueError:
338 except ValueError:
339 raise error.ResponseError(
339 raise error.ResponseError(
340 _('push failed (unexpected response):'), ret)
340 _('push failed (unexpected response):'), ret)
341
341
342 for l in output.splitlines(True):
342 for l in output.splitlines(True):
343 self.ui.status(_('remote: '), l)
343 self.ui.status(_('remote: '), l)
344 else:
344 else:
345 # bundle2 push. Send a stream, fetch a stream.
345 # bundle2 push. Send a stream, fetch a stream.
346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
347 ret = bundle2.getunbundler(self.ui, stream)
347 ret = bundle2.getunbundler(self.ui, stream)
348 return ret
348 return ret
349
349
350 # End of basewirepeer interface.
350 # End of basewirepeer interface.
351
351
352 # Begin of baselegacywirepeer interface.
352 # Begin of baselegacywirepeer interface.
353
353
354 def branches(self, nodes):
354 def branches(self, nodes):
355 n = encodelist(nodes)
355 n = encodelist(nodes)
356 d = self._call("branches", nodes=n)
356 d = self._call("branches", nodes=n)
357 try:
357 try:
358 br = [tuple(decodelist(b)) for b in d.splitlines()]
358 br = [tuple(decodelist(b)) for b in d.splitlines()]
359 return br
359 return br
360 except ValueError:
360 except ValueError:
361 self._abort(error.ResponseError(_("unexpected response:"), d))
361 self._abort(error.ResponseError(_("unexpected response:"), d))
362
362
363 def between(self, pairs):
363 def between(self, pairs):
364 batch = 8 # avoid giant requests
364 batch = 8 # avoid giant requests
365 r = []
365 r = []
366 for i in xrange(0, len(pairs), batch):
366 for i in xrange(0, len(pairs), batch):
367 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
367 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
368 d = self._call("between", pairs=n)
368 d = self._call("between", pairs=n)
369 try:
369 try:
370 r.extend(l and decodelist(l) or [] for l in d.splitlines())
370 r.extend(l and decodelist(l) or [] for l in d.splitlines())
371 except ValueError:
371 except ValueError:
372 self._abort(error.ResponseError(_("unexpected response:"), d))
372 self._abort(error.ResponseError(_("unexpected response:"), d))
373 return r
373 return r
374
374
375 def changegroup(self, nodes, kind):
375 def changegroup(self, nodes, kind):
376 n = encodelist(nodes)
376 n = encodelist(nodes)
377 f = self._callcompressable("changegroup", roots=n)
377 f = self._callcompressable("changegroup", roots=n)
378 return changegroupmod.cg1unpacker(f, 'UN')
378 return changegroupmod.cg1unpacker(f, 'UN')
379
379
380 def changegroupsubset(self, bases, heads, kind):
380 def changegroupsubset(self, bases, heads, kind):
381 self.requirecap('changegroupsubset', _('look up remote changes'))
381 self.requirecap('changegroupsubset', _('look up remote changes'))
382 bases = encodelist(bases)
382 bases = encodelist(bases)
383 heads = encodelist(heads)
383 heads = encodelist(heads)
384 f = self._callcompressable("changegroupsubset",
384 f = self._callcompressable("changegroupsubset",
385 bases=bases, heads=heads)
385 bases=bases, heads=heads)
386 return changegroupmod.cg1unpacker(f, 'UN')
386 return changegroupmod.cg1unpacker(f, 'UN')
387
387
388 # End of baselegacywirepeer interface.
388 # End of baselegacywirepeer interface.
389
389
390 def _submitbatch(self, req):
390 def _submitbatch(self, req):
391 """run batch request <req> on the server
391 """run batch request <req> on the server
392
392
393 Returns an iterator of the raw responses from the server.
393 Returns an iterator of the raw responses from the server.
394 """
394 """
395 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
395 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
396 chunk = rsp.read(1024)
396 chunk = rsp.read(1024)
397 work = [chunk]
397 work = [chunk]
398 while chunk:
398 while chunk:
399 while ';' not in chunk and chunk:
399 while ';' not in chunk and chunk:
400 chunk = rsp.read(1024)
400 chunk = rsp.read(1024)
401 work.append(chunk)
401 work.append(chunk)
402 merged = ''.join(work)
402 merged = ''.join(work)
403 while ';' in merged:
403 while ';' in merged:
404 one, merged = merged.split(';', 1)
404 one, merged = merged.split(';', 1)
405 yield unescapearg(one)
405 yield unescapearg(one)
406 chunk = rsp.read(1024)
406 chunk = rsp.read(1024)
407 work = [merged, chunk]
407 work = [merged, chunk]
408 yield unescapearg(''.join(work))
408 yield unescapearg(''.join(work))
409
409
410 def _submitone(self, op, args):
410 def _submitone(self, op, args):
411 return self._call(op, **pycompat.strkwargs(args))
411 return self._call(op, **pycompat.strkwargs(args))
412
412
413 def debugwireargs(self, one, two, three=None, four=None, five=None):
413 def debugwireargs(self, one, two, three=None, four=None, five=None):
414 # don't pass optional arguments left at their default value
414 # don't pass optional arguments left at their default value
415 opts = {}
415 opts = {}
416 if three is not None:
416 if three is not None:
417 opts[r'three'] = three
417 opts[r'three'] = three
418 if four is not None:
418 if four is not None:
419 opts[r'four'] = four
419 opts[r'four'] = four
420 return self._call('debugwireargs', one=one, two=two, **opts)
420 return self._call('debugwireargs', one=one, two=two, **opts)
421
421
422 def _call(self, cmd, **args):
422 def _call(self, cmd, **args):
423 """execute <cmd> on the server
423 """execute <cmd> on the server
424
424
425 The command is expected to return a simple string.
425 The command is expected to return a simple string.
426
426
427 returns the server reply as a string."""
427 returns the server reply as a string."""
428 raise NotImplementedError()
428 raise NotImplementedError()
429
429
430 def _callstream(self, cmd, **args):
430 def _callstream(self, cmd, **args):
431 """execute <cmd> on the server
431 """execute <cmd> on the server
432
432
433 The command is expected to return a stream. Note that if the
433 The command is expected to return a stream. Note that if the
434 command doesn't return a stream, _callstream behaves
434 command doesn't return a stream, _callstream behaves
435 differently for ssh and http peers.
435 differently for ssh and http peers.
436
436
437 returns the server reply as a file like object.
437 returns the server reply as a file like object.
438 """
438 """
439 raise NotImplementedError()
439 raise NotImplementedError()
440
440
441 def _callcompressable(self, cmd, **args):
441 def _callcompressable(self, cmd, **args):
442 """execute <cmd> on the server
442 """execute <cmd> on the server
443
443
444 The command is expected to return a stream.
444 The command is expected to return a stream.
445
445
446 The stream may have been compressed in some implementations. This
446 The stream may have been compressed in some implementations. This
447 function takes care of the decompression. This is the only difference
447 function takes care of the decompression. This is the only difference
448 with _callstream.
448 with _callstream.
449
449
450 returns the server reply as a file like object.
450 returns the server reply as a file like object.
451 """
451 """
452 raise NotImplementedError()
452 raise NotImplementedError()
453
453
454 def _callpush(self, cmd, fp, **args):
454 def _callpush(self, cmd, fp, **args):
455 """execute a <cmd> on server
455 """execute a <cmd> on server
456
456
457 The command is expected to be related to a push. Push has a special
457 The command is expected to be related to a push. Push has a special
458 return method.
458 return method.
459
459
460 returns the server reply as a (ret, output) tuple. ret is either
460 returns the server reply as a (ret, output) tuple. ret is either
461 empty (error) or a stringified int.
461 empty (error) or a stringified int.
462 """
462 """
463 raise NotImplementedError()
463 raise NotImplementedError()
464
464
465 def _calltwowaystream(self, cmd, fp, **args):
465 def _calltwowaystream(self, cmd, fp, **args):
466 """execute <cmd> on server
466 """execute <cmd> on server
467
467
468 The command will send a stream to the server and get a stream in reply.
468 The command will send a stream to the server and get a stream in reply.
469 """
469 """
470 raise NotImplementedError()
470 raise NotImplementedError()
471
471
472 def _abort(self, exception):
472 def _abort(self, exception):
473 """clearly abort the wire protocol connection and raise the exception
473 """clearly abort the wire protocol connection and raise the exception
474 """
474 """
475 raise NotImplementedError()
475 raise NotImplementedError()
476
476
477 # server side
477 # server side
478
478
479 # wire protocol command can either return a string or one of these classes.
479 # wire protocol command can either return a string or one of these classes.
480 class streamres(object):
480 class streamres(object):
481 """wireproto reply: binary stream
481 """wireproto reply: binary stream
482
482
483 The call was successful and the result is a stream.
483 The call was successful and the result is a stream.
484
484
485 Accepts a generator containing chunks of data to be sent to the client.
485 Accepts a generator containing chunks of data to be sent to the client.
486
486
487 ``prefer_uncompressed`` indicates that the data is expected to be
487 ``prefer_uncompressed`` indicates that the data is expected to be
488 uncompressable and that the stream should therefore use the ``none``
488 uncompressable and that the stream should therefore use the ``none``
489 engine.
489 engine.
490 """
490 """
491 def __init__(self, gen=None, prefer_uncompressed=False):
491 def __init__(self, gen=None, prefer_uncompressed=False):
492 self.gen = gen
492 self.gen = gen
493 self.prefer_uncompressed = prefer_uncompressed
493 self.prefer_uncompressed = prefer_uncompressed
494
494
495 class streamres_legacy(object):
495 class streamres_legacy(object):
496 """wireproto reply: uncompressed binary stream
496 """wireproto reply: uncompressed binary stream
497
497
498 The call was successful and the result is a stream.
498 The call was successful and the result is a stream.
499
499
500 Accepts a generator containing chunks of data to be sent to the client.
500 Accepts a generator containing chunks of data to be sent to the client.
501
501
502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
503 using the application/mercurial-0.1 media type.
503 using the application/mercurial-0.1 media type.
504 """
504 """
505 def __init__(self, gen=None):
505 def __init__(self, gen=None):
506 self.gen = gen
506 self.gen = gen
507
507
508 class pushres(object):
508 class pushres(object):
509 """wireproto reply: success with simple integer return
509 """wireproto reply: success with simple integer return
510
510
511 The call was successful and returned an integer contained in `self.res`.
511 The call was successful and returned an integer contained in `self.res`.
512 """
512 """
513 def __init__(self, res, output):
513 def __init__(self, res, output):
514 self.res = res
514 self.res = res
515 self.output = output
515 self.output = output
516
516
517 class pusherr(object):
517 class pusherr(object):
518 """wireproto reply: failure
518 """wireproto reply: failure
519
519
520 The call failed. The `self.res` attribute contains the error message.
520 The call failed. The `self.res` attribute contains the error message.
521 """
521 """
522 def __init__(self, res, output):
522 def __init__(self, res, output):
523 self.res = res
523 self.res = res
524 self.output = output
524 self.output = output
525
525
526 class ooberror(object):
526 class ooberror(object):
527 """wireproto reply: failure of a batch of operation
527 """wireproto reply: failure of a batch of operation
528
528
529 Something failed during a batch call. The error message is stored in
529 Something failed during a batch call. The error message is stored in
530 `self.message`.
530 `self.message`.
531 """
531 """
532 def __init__(self, message):
532 def __init__(self, message):
533 self.message = message
533 self.message = message
534
534
535 def getdispatchrepo(repo, proto, command):
535 def getdispatchrepo(repo, proto, command):
536 """Obtain the repo used for processing wire protocol commands.
536 """Obtain the repo used for processing wire protocol commands.
537
537
538 The intent of this function is to serve as a monkeypatch point for
538 The intent of this function is to serve as a monkeypatch point for
539 extensions that need commands to operate on different repo views under
539 extensions that need commands to operate on different repo views under
540 specialized circumstances.
540 specialized circumstances.
541 """
541 """
542 return repo.filtered('served')
542 return repo.filtered('served')
543
543
544 def dispatch(repo, proto, command):
544 def dispatch(repo, proto, command):
545 repo = getdispatchrepo(repo, proto, command)
545 repo = getdispatchrepo(repo, proto, command)
546 func, spec = commands[command]
546 func, spec = commands[command]
547 args = proto.getargs(spec)
547 args = proto.getargs(spec)
548 return func(repo, proto, *args)
548 return func(repo, proto, *args)
549
549
550 def options(cmd, keys, others):
550 def options(cmd, keys, others):
551 opts = {}
551 opts = {}
552 for k in keys:
552 for k in keys:
553 if k in others:
553 if k in others:
554 opts[k] = others[k]
554 opts[k] = others[k]
555 del others[k]
555 del others[k]
556 if others:
556 if others:
557 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
557 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
558 % (cmd, ",".join(others)))
558 % (cmd, ",".join(others)))
559 return opts
559 return opts
560
560
561 def bundle1allowed(repo, action):
561 def bundle1allowed(repo, action):
562 """Whether a bundle1 operation is allowed from the server.
562 """Whether a bundle1 operation is allowed from the server.
563
563
564 Priority is:
564 Priority is:
565
565
566 1. server.bundle1gd.<action> (if generaldelta active)
566 1. server.bundle1gd.<action> (if generaldelta active)
567 2. server.bundle1.<action>
567 2. server.bundle1.<action>
568 3. server.bundle1gd (if generaldelta active)
568 3. server.bundle1gd (if generaldelta active)
569 4. server.bundle1
569 4. server.bundle1
570 """
570 """
571 ui = repo.ui
571 ui = repo.ui
572 gd = 'generaldelta' in repo.requirements
572 gd = 'generaldelta' in repo.requirements
573
573
574 if gd:
574 if gd:
575 v = ui.configbool('server', 'bundle1gd.%s' % action)
575 v = ui.configbool('server', 'bundle1gd.%s' % action)
576 if v is not None:
576 if v is not None:
577 return v
577 return v
578
578
579 v = ui.configbool('server', 'bundle1.%s' % action)
579 v = ui.configbool('server', 'bundle1.%s' % action)
580 if v is not None:
580 if v is not None:
581 return v
581 return v
582
582
583 if gd:
583 if gd:
584 v = ui.configbool('server', 'bundle1gd')
584 v = ui.configbool('server', 'bundle1gd')
585 if v is not None:
585 if v is not None:
586 return v
586 return v
587
587
588 return ui.configbool('server', 'bundle1')
588 return ui.configbool('server', 'bundle1')
589
589
590 def supportedcompengines(ui, proto, role):
590 def supportedcompengines(ui, proto, role):
591 """Obtain the list of supported compression engines for a request."""
591 """Obtain the list of supported compression engines for a request."""
592 assert role in (util.CLIENTROLE, util.SERVERROLE)
592 assert role in (util.CLIENTROLE, util.SERVERROLE)
593
593
594 compengines = util.compengines.supportedwireengines(role)
594 compengines = util.compengines.supportedwireengines(role)
595
595
596 # Allow config to override default list and ordering.
596 # Allow config to override default list and ordering.
597 if role == util.SERVERROLE:
597 if role == util.SERVERROLE:
598 configengines = ui.configlist('server', 'compressionengines')
598 configengines = ui.configlist('server', 'compressionengines')
599 config = 'server.compressionengines'
599 config = 'server.compressionengines'
600 else:
600 else:
601 # This is currently implemented mainly to facilitate testing. In most
601 # This is currently implemented mainly to facilitate testing. In most
602 # cases, the server should be in charge of choosing a compression engine
602 # cases, the server should be in charge of choosing a compression engine
603 # because a server has the most to lose from a sub-optimal choice. (e.g.
603 # because a server has the most to lose from a sub-optimal choice. (e.g.
604 # CPU DoS due to an expensive engine or a network DoS due to poor
604 # CPU DoS due to an expensive engine or a network DoS due to poor
605 # compression ratio).
605 # compression ratio).
606 configengines = ui.configlist('experimental',
606 configengines = ui.configlist('experimental',
607 'clientcompressionengines')
607 'clientcompressionengines')
608 config = 'experimental.clientcompressionengines'
608 config = 'experimental.clientcompressionengines'
609
609
610 # No explicit config. Filter out the ones that aren't supposed to be
610 # No explicit config. Filter out the ones that aren't supposed to be
611 # advertised and return default ordering.
611 # advertised and return default ordering.
612 if not configengines:
612 if not configengines:
613 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
613 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
614 return [e for e in compengines
614 return [e for e in compengines
615 if getattr(e.wireprotosupport(), attr) > 0]
615 if getattr(e.wireprotosupport(), attr) > 0]
616
616
617 # If compression engines are listed in the config, assume there is a good
617 # If compression engines are listed in the config, assume there is a good
618 # reason for it (like server operators wanting to achieve specific
618 # reason for it (like server operators wanting to achieve specific
619 # performance characteristics). So fail fast if the config references
619 # performance characteristics). So fail fast if the config references
620 # unusable compression engines.
620 # unusable compression engines.
621 validnames = set(e.name() for e in compengines)
621 validnames = set(e.name() for e in compengines)
622 invalidnames = set(e for e in configengines if e not in validnames)
622 invalidnames = set(e for e in configengines if e not in validnames)
623 if invalidnames:
623 if invalidnames:
624 raise error.Abort(_('invalid compression engine defined in %s: %s') %
624 raise error.Abort(_('invalid compression engine defined in %s: %s') %
625 (config, ', '.join(sorted(invalidnames))))
625 (config, ', '.join(sorted(invalidnames))))
626
626
627 compengines = [e for e in compengines if e.name() in configengines]
627 compengines = [e for e in compengines if e.name() in configengines]
628 compengines = sorted(compengines,
628 compengines = sorted(compengines,
629 key=lambda e: configengines.index(e.name()))
629 key=lambda e: configengines.index(e.name()))
630
630
631 if not compengines:
631 if not compengines:
632 raise error.Abort(_('%s config option does not specify any known '
632 raise error.Abort(_('%s config option does not specify any known '
633 'compression engines') % config,
633 'compression engines') % config,
634 hint=_('usable compression engines: %s') %
634 hint=_('usable compression engines: %s') %
635 ', '.sorted(validnames))
635 ', '.sorted(validnames))
636
636
637 return compengines
637 return compengines
638
638
639 class commandentry(object):
639 class commandentry(object):
640 """Represents a declared wire protocol command."""
640 """Represents a declared wire protocol command."""
641 def __init__(self, func, args=''):
641 def __init__(self, func, args=''):
642 self.func = func
642 self.func = func
643 self.args = args
643 self.args = args
644
644
645 def _merge(self, func, args):
645 def _merge(self, func, args):
646 """Merge this instance with an incoming 2-tuple.
646 """Merge this instance with an incoming 2-tuple.
647
647
648 This is called when a caller using the old 2-tuple API attempts
648 This is called when a caller using the old 2-tuple API attempts
649 to replace an instance. The incoming values are merged with
649 to replace an instance. The incoming values are merged with
650 data not captured by the 2-tuple and a new instance containing
650 data not captured by the 2-tuple and a new instance containing
651 the union of the two objects is returned.
651 the union of the two objects is returned.
652 """
652 """
653 return commandentry(func, args)
653 return commandentry(func, args)
654
654
655 # Old code treats instances as 2-tuples. So expose that interface.
655 # Old code treats instances as 2-tuples. So expose that interface.
656 def __iter__(self):
656 def __iter__(self):
657 yield self.func
657 yield self.func
658 yield self.args
658 yield self.args
659
659
660 def __getitem__(self, i):
660 def __getitem__(self, i):
661 if i == 0:
661 if i == 0:
662 return self.func
662 return self.func
663 elif i == 1:
663 elif i == 1:
664 return self.args
664 return self.args
665 else:
665 else:
666 raise IndexError('can only access elements 0 and 1')
666 raise IndexError('can only access elements 0 and 1')
667
667
668 class commanddict(dict):
668 class commanddict(dict):
669 """Container for registered wire protocol commands.
669 """Container for registered wire protocol commands.
670
670
671 It behaves like a dict. But __setitem__ is overwritten to allow silent
671 It behaves like a dict. But __setitem__ is overwritten to allow silent
672 coercion of values from 2-tuples for API compatibility.
672 coercion of values from 2-tuples for API compatibility.
673 """
673 """
674 def __setitem__(self, k, v):
674 def __setitem__(self, k, v):
675 if isinstance(v, commandentry):
675 if isinstance(v, commandentry):
676 pass
676 pass
677 # Cast 2-tuples to commandentry instances.
677 # Cast 2-tuples to commandentry instances.
678 elif isinstance(v, tuple):
678 elif isinstance(v, tuple):
679 if len(v) != 2:
679 if len(v) != 2:
680 raise ValueError('command tuples must have exactly 2 elements')
680 raise ValueError('command tuples must have exactly 2 elements')
681
681
682 # It is common for extensions to wrap wire protocol commands via
682 # It is common for extensions to wrap wire protocol commands via
683 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
683 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
684 # doing this aren't aware of the new API that uses objects to store
684 # doing this aren't aware of the new API that uses objects to store
685 # command entries, we automatically merge old state with new.
685 # command entries, we automatically merge old state with new.
686 if k in self:
686 if k in self:
687 v = self[k]._merge(v[0], v[1])
687 v = self[k]._merge(v[0], v[1])
688 else:
688 else:
689 v = commandentry(v[0], v[1])
689 v = commandentry(v[0], v[1])
690 else:
690 else:
691 raise ValueError('command entries must be commandentry instances '
691 raise ValueError('command entries must be commandentry instances '
692 'or 2-tuples')
692 'or 2-tuples')
693
693
694 return super(commanddict, self).__setitem__(k, v)
694 return super(commanddict, self).__setitem__(k, v)
695
695
696 def commandavailable(self, command, proto):
696 def commandavailable(self, command, proto):
697 """Determine if a command is available for the requested protocol."""
697 """Determine if a command is available for the requested protocol."""
698 # For now, commands are available for all protocols. So do a simple
698 # For now, commands are available for all protocols. So do a simple
699 # membership test.
699 # membership test.
700 return command in self
700 return command in self
701
701
702 commands = commanddict()
702 commands = commanddict()
703
703
704 def wireprotocommand(name, args=''):
704 def wireprotocommand(name, args=''):
705 """Decorator to declare a wire protocol command.
705 """Decorator to declare a wire protocol command.
706
706
707 ``name`` is the name of the wire protocol command being provided.
707 ``name`` is the name of the wire protocol command being provided.
708
708
709 ``args`` is a space-delimited list of named arguments that the command
709 ``args`` is a space-delimited list of named arguments that the command
710 accepts. ``*`` is a special value that says to accept all arguments.
710 accepts. ``*`` is a special value that says to accept all arguments.
711 """
711 """
712 def register(func):
712 def register(func):
713 commands[name] = commandentry(func, args)
713 commands[name] = commandentry(func, args)
714 return func
714 return func
715 return register
715 return register
716
716
717 @wireprotocommand('batch', 'cmds *')
717 @wireprotocommand('batch', 'cmds *')
718 def batch(repo, proto, cmds, others):
718 def batch(repo, proto, cmds, others):
719 repo = repo.filtered("served")
719 repo = repo.filtered("served")
720 res = []
720 res = []
721 for pair in cmds.split(';'):
721 for pair in cmds.split(';'):
722 op, args = pair.split(' ', 1)
722 op, args = pair.split(' ', 1)
723 vals = {}
723 vals = {}
724 for a in args.split(','):
724 for a in args.split(','):
725 if a:
725 if a:
726 n, v = a.split('=')
726 n, v = a.split('=')
727 vals[unescapearg(n)] = unescapearg(v)
727 vals[unescapearg(n)] = unescapearg(v)
728 func, spec = commands[op]
728 func, spec = commands[op]
729 if spec:
729 if spec:
730 keys = spec.split()
730 keys = spec.split()
731 data = {}
731 data = {}
732 for k in keys:
732 for k in keys:
733 if k == '*':
733 if k == '*':
734 star = {}
734 star = {}
735 for key in vals.keys():
735 for key in vals.keys():
736 if key not in keys:
736 if key not in keys:
737 star[key] = vals[key]
737 star[key] = vals[key]
738 data['*'] = star
738 data['*'] = star
739 else:
739 else:
740 data[k] = vals[k]
740 data[k] = vals[k]
741 result = func(repo, proto, *[data[k] for k in keys])
741 result = func(repo, proto, *[data[k] for k in keys])
742 else:
742 else:
743 result = func(repo, proto)
743 result = func(repo, proto)
744 if isinstance(result, ooberror):
744 if isinstance(result, ooberror):
745 return result
745 return result
746 res.append(escapearg(result))
746 res.append(escapearg(result))
747 return ';'.join(res)
747 return ';'.join(res)
748
748
749 @wireprotocommand('between', 'pairs')
749 @wireprotocommand('between', 'pairs')
750 def between(repo, proto, pairs):
750 def between(repo, proto, pairs):
751 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
751 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
752 r = []
752 r = []
753 for b in repo.between(pairs):
753 for b in repo.between(pairs):
754 r.append(encodelist(b) + "\n")
754 r.append(encodelist(b) + "\n")
755 return "".join(r)
755 return "".join(r)
756
756
757 @wireprotocommand('branchmap')
757 @wireprotocommand('branchmap')
758 def branchmap(repo, proto):
758 def branchmap(repo, proto):
759 branchmap = repo.branchmap()
759 branchmap = repo.branchmap()
760 heads = []
760 heads = []
761 for branch, nodes in branchmap.iteritems():
761 for branch, nodes in branchmap.iteritems():
762 branchname = urlreq.quote(encoding.fromlocal(branch))
762 branchname = urlreq.quote(encoding.fromlocal(branch))
763 branchnodes = encodelist(nodes)
763 branchnodes = encodelist(nodes)
764 heads.append('%s %s' % (branchname, branchnodes))
764 heads.append('%s %s' % (branchname, branchnodes))
765 return '\n'.join(heads)
765 return '\n'.join(heads)
766
766
767 @wireprotocommand('branches', 'nodes')
767 @wireprotocommand('branches', 'nodes')
768 def branches(repo, proto, nodes):
768 def branches(repo, proto, nodes):
769 nodes = decodelist(nodes)
769 nodes = decodelist(nodes)
770 r = []
770 r = []
771 for b in repo.branches(nodes):
771 for b in repo.branches(nodes):
772 r.append(encodelist(b) + "\n")
772 r.append(encodelist(b) + "\n")
773 return "".join(r)
773 return "".join(r)
774
774
775 @wireprotocommand('clonebundles', '')
775 @wireprotocommand('clonebundles', '')
776 def clonebundles(repo, proto):
776 def clonebundles(repo, proto):
777 """Server command for returning info for available bundles to seed clones.
777 """Server command for returning info for available bundles to seed clones.
778
778
779 Clients will parse this response and determine what bundle to fetch.
779 Clients will parse this response and determine what bundle to fetch.
780
780
781 Extensions may wrap this command to filter or dynamically emit data
781 Extensions may wrap this command to filter or dynamically emit data
782 depending on the request. e.g. you could advertise URLs for the closest
782 depending on the request. e.g. you could advertise URLs for the closest
783 data center given the client's IP address.
783 data center given the client's IP address.
784 """
784 """
785 return repo.vfs.tryread('clonebundles.manifest')
785 return repo.vfs.tryread('clonebundles.manifest')
786
786
787 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
787 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
788 'known', 'getbundle', 'unbundlehash', 'batch']
788 'known', 'getbundle', 'unbundlehash', 'batch']
789
789
790 def _capabilities(repo, proto):
790 def _capabilities(repo, proto):
791 """return a list of capabilities for a repo
791 """return a list of capabilities for a repo
792
792
793 This function exists to allow extensions to easily wrap capabilities
793 This function exists to allow extensions to easily wrap capabilities
794 computation
794 computation
795
795
796 - returns a lists: easy to alter
796 - returns a lists: easy to alter
797 - change done here will be propagated to both `capabilities` and `hello`
797 - change done here will be propagated to both `capabilities` and `hello`
798 command without any other action needed.
798 command without any other action needed.
799 """
799 """
800 # copy to prevent modification of the global list
800 # copy to prevent modification of the global list
801 caps = list(wireprotocaps)
801 caps = list(wireprotocaps)
802 if streamclone.allowservergeneration(repo):
802 if streamclone.allowservergeneration(repo):
803 if repo.ui.configbool('server', 'preferuncompressed'):
803 if repo.ui.configbool('server', 'preferuncompressed'):
804 caps.append('stream-preferred')
804 caps.append('stream-preferred')
805 requiredformats = repo.requirements & repo.supportedformats
805 requiredformats = repo.requirements & repo.supportedformats
806 # if our local revlogs are just revlogv1, add 'stream' cap
806 # if our local revlogs are just revlogv1, add 'stream' cap
807 if not requiredformats - {'revlogv1'}:
807 if not requiredformats - {'revlogv1'}:
808 caps.append('stream')
808 caps.append('stream')
809 # otherwise, add 'streamreqs' detailing our local revlog format
809 # otherwise, add 'streamreqs' detailing our local revlog format
810 else:
810 else:
811 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
811 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
812 if repo.ui.configbool('experimental', 'bundle2-advertise'):
812 if repo.ui.configbool('experimental', 'bundle2-advertise'):
813 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
813 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
814 caps.append('bundle2=' + urlreq.quote(capsblob))
814 caps.append('bundle2=' + urlreq.quote(capsblob))
815 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
815 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
816
816
817 if proto.name == 'http':
817 if proto.name == 'http':
818 caps.append('httpheader=%d' %
818 caps.append('httpheader=%d' %
819 repo.ui.configint('server', 'maxhttpheaderlen'))
819 repo.ui.configint('server', 'maxhttpheaderlen'))
820 if repo.ui.configbool('experimental', 'httppostargs'):
820 if repo.ui.configbool('experimental', 'httppostargs'):
821 caps.append('httppostargs')
821 caps.append('httppostargs')
822
822
823 # FUTURE advertise 0.2rx once support is implemented
823 # FUTURE advertise 0.2rx once support is implemented
824 # FUTURE advertise minrx and mintx after consulting config option
824 # FUTURE advertise minrx and mintx after consulting config option
825 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
825 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
826
826
827 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
827 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
828 if compengines:
828 if compengines:
829 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
829 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
830 for e in compengines)
830 for e in compengines)
831 caps.append('compression=%s' % comptypes)
831 caps.append('compression=%s' % comptypes)
832
832
833 return caps
833 return caps
834
834
835 # If you are writing an extension and consider wrapping this function. Wrap
835 # If you are writing an extension and consider wrapping this function. Wrap
836 # `_capabilities` instead.
836 # `_capabilities` instead.
837 @wireprotocommand('capabilities')
837 @wireprotocommand('capabilities')
838 def capabilities(repo, proto):
838 def capabilities(repo, proto):
839 return ' '.join(_capabilities(repo, proto))
839 return ' '.join(_capabilities(repo, proto))
840
840
841 @wireprotocommand('changegroup', 'roots')
841 @wireprotocommand('changegroup', 'roots')
842 def changegroup(repo, proto, roots):
842 def changegroup(repo, proto, roots):
843 nodes = decodelist(roots)
843 nodes = decodelist(roots)
844 outgoing = discovery.outgoing(repo, missingroots=nodes,
844 outgoing = discovery.outgoing(repo, missingroots=nodes,
845 missingheads=repo.heads())
845 missingheads=repo.heads())
846 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
846 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
847 gen = iter(lambda: cg.read(32768), '')
847 gen = iter(lambda: cg.read(32768), '')
848 return streamres(gen=gen)
848 return streamres(gen=gen)
849
849
850 @wireprotocommand('changegroupsubset', 'bases heads')
850 @wireprotocommand('changegroupsubset', 'bases heads')
851 def changegroupsubset(repo, proto, bases, heads):
851 def changegroupsubset(repo, proto, bases, heads):
852 bases = decodelist(bases)
852 bases = decodelist(bases)
853 heads = decodelist(heads)
853 heads = decodelist(heads)
854 outgoing = discovery.outgoing(repo, missingroots=bases,
854 outgoing = discovery.outgoing(repo, missingroots=bases,
855 missingheads=heads)
855 missingheads=heads)
856 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
856 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
857 gen = iter(lambda: cg.read(32768), '')
857 gen = iter(lambda: cg.read(32768), '')
858 return streamres(gen=gen)
858 return streamres(gen=gen)
859
859
860 @wireprotocommand('debugwireargs', 'one two *')
860 @wireprotocommand('debugwireargs', 'one two *')
861 def debugwireargs(repo, proto, one, two, others):
861 def debugwireargs(repo, proto, one, two, others):
862 # only accept optional args from the known set
862 # only accept optional args from the known set
863 opts = options('debugwireargs', ['three', 'four'], others)
863 opts = options('debugwireargs', ['three', 'four'], others)
864 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
864 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
865
865
866 @wireprotocommand('getbundle', '*')
866 @wireprotocommand('getbundle', '*')
867 def getbundle(repo, proto, others):
867 def getbundle(repo, proto, others):
868 opts = options('getbundle', gboptsmap.keys(), others)
868 opts = options('getbundle', gboptsmap.keys(), others)
869 for k, v in opts.iteritems():
869 for k, v in opts.iteritems():
870 keytype = gboptsmap[k]
870 keytype = gboptsmap[k]
871 if keytype == 'nodes':
871 if keytype == 'nodes':
872 opts[k] = decodelist(v)
872 opts[k] = decodelist(v)
873 elif keytype == 'csv':
873 elif keytype == 'csv':
874 opts[k] = list(v.split(','))
874 opts[k] = list(v.split(','))
875 elif keytype == 'scsv':
875 elif keytype == 'scsv':
876 opts[k] = set(v.split(','))
876 opts[k] = set(v.split(','))
877 elif keytype == 'boolean':
877 elif keytype == 'boolean':
878 # Client should serialize False as '0', which is a non-empty string
878 # Client should serialize False as '0', which is a non-empty string
879 # so it evaluates as a True bool.
879 # so it evaluates as a True bool.
880 if v == '0':
880 if v == '0':
881 opts[k] = False
881 opts[k] = False
882 else:
882 else:
883 opts[k] = bool(v)
883 opts[k] = bool(v)
884 elif keytype != 'plain':
884 elif keytype != 'plain':
885 raise KeyError('unknown getbundle option type %s'
885 raise KeyError('unknown getbundle option type %s'
886 % keytype)
886 % keytype)
887
887
888 if not bundle1allowed(repo, 'pull'):
888 if not bundle1allowed(repo, 'pull'):
889 if not exchange.bundle2requested(opts.get('bundlecaps')):
889 if not exchange.bundle2requested(opts.get('bundlecaps')):
890 if proto.name == 'http':
890 if proto.name == 'http':
891 return ooberror(bundle2required)
891 return ooberror(bundle2required)
892 raise error.Abort(bundle2requiredmain,
892 raise error.Abort(bundle2requiredmain,
893 hint=bundle2requiredhint)
893 hint=bundle2requiredhint)
894
894
895 prefercompressed = True
895 prefercompressed = True
896
896
897 try:
897 try:
898 if repo.ui.configbool('server', 'disablefullbundle'):
898 if repo.ui.configbool('server', 'disablefullbundle'):
899 # Check to see if this is a full clone.
899 # Check to see if this is a full clone.
900 clheads = set(repo.changelog.heads())
900 clheads = set(repo.changelog.heads())
901 changegroup = opts.get('cg', True)
901 changegroup = opts.get('cg', True)
902 heads = set(opts.get('heads', set()))
902 heads = set(opts.get('heads', set()))
903 common = set(opts.get('common', set()))
903 common = set(opts.get('common', set()))
904 common.discard(nullid)
904 common.discard(nullid)
905 if changegroup and not common and clheads == heads:
905 if changegroup and not common and clheads == heads:
906 raise error.Abort(
906 raise error.Abort(
907 _('server has pull-based clones disabled'),
907 _('server has pull-based clones disabled'),
908 hint=_('remove --pull if specified or upgrade Mercurial'))
908 hint=_('remove --pull if specified or upgrade Mercurial'))
909
909
910 info, chunks = exchange.getbundlechunks(repo, 'serve',
910 info, chunks = exchange.getbundlechunks(repo, 'serve',
911 **pycompat.strkwargs(opts))
911 **pycompat.strkwargs(opts))
912 prefercompressed = info.get('prefercompressed', True)
912 prefercompressed = info.get('prefercompressed', True)
913 except error.Abort as exc:
913 except error.Abort as exc:
914 # cleanly forward Abort error to the client
914 # cleanly forward Abort error to the client
915 if not exchange.bundle2requested(opts.get('bundlecaps')):
915 if not exchange.bundle2requested(opts.get('bundlecaps')):
916 if proto.name == 'http':
916 if proto.name == 'http':
917 return ooberror(str(exc) + '\n')
917 return ooberror(str(exc) + '\n')
918 raise # cannot do better for bundle1 + ssh
918 raise # cannot do better for bundle1 + ssh
919 # bundle2 request expect a bundle2 reply
919 # bundle2 request expect a bundle2 reply
920 bundler = bundle2.bundle20(repo.ui)
920 bundler = bundle2.bundle20(repo.ui)
921 manargs = [('message', str(exc))]
921 manargs = [('message', str(exc))]
922 advargs = []
922 advargs = []
923 if exc.hint is not None:
923 if exc.hint is not None:
924 advargs.append(('hint', exc.hint))
924 advargs.append(('hint', exc.hint))
925 bundler.addpart(bundle2.bundlepart('error:abort',
925 bundler.addpart(bundle2.bundlepart('error:abort',
926 manargs, advargs))
926 manargs, advargs))
927 chunks = bundler.getchunks()
927 chunks = bundler.getchunks()
928 prefercompressed = False
928 prefercompressed = False
929
929
930 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
930 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
931
931
932 @wireprotocommand('heads')
932 @wireprotocommand('heads')
933 def heads(repo, proto):
933 def heads(repo, proto):
934 h = repo.heads()
934 h = repo.heads()
935 return encodelist(h) + "\n"
935 return encodelist(h) + "\n"
936
936
937 @wireprotocommand('hello')
937 @wireprotocommand('hello')
938 def hello(repo, proto):
938 def hello(repo, proto):
939 '''the hello command returns a set of lines describing various
939 '''the hello command returns a set of lines describing various
940 interesting things about the server, in an RFC822-like format.
940 interesting things about the server, in an RFC822-like format.
941 Currently the only one defined is "capabilities", which
941 Currently the only one defined is "capabilities", which
942 consists of a line in the form:
942 consists of a line in the form:
943
943
944 capabilities: space separated list of tokens
944 capabilities: space separated list of tokens
945 '''
945 '''
946 return "capabilities: %s\n" % (capabilities(repo, proto))
946 return "capabilities: %s\n" % (capabilities(repo, proto))
947
947
948 @wireprotocommand('listkeys', 'namespace')
948 @wireprotocommand('listkeys', 'namespace')
949 def listkeys(repo, proto, namespace):
949 def listkeys(repo, proto, namespace):
950 d = repo.listkeys(encoding.tolocal(namespace)).items()
950 d = repo.listkeys(encoding.tolocal(namespace)).items()
951 return pushkeymod.encodekeys(d)
951 return pushkeymod.encodekeys(d)
952
952
953 @wireprotocommand('lookup', 'key')
953 @wireprotocommand('lookup', 'key')
954 def lookup(repo, proto, key):
954 def lookup(repo, proto, key):
955 try:
955 try:
956 k = encoding.tolocal(key)
956 k = encoding.tolocal(key)
957 c = repo[k]
957 c = repo[k]
958 r = c.hex()
958 r = c.hex()
959 success = 1
959 success = 1
960 except Exception as inst:
960 except Exception as inst:
961 r = str(inst)
961 r = str(inst)
962 success = 0
962 success = 0
963 return "%d %s\n" % (success, r)
963 return "%d %s\n" % (success, r)
964
964
965 @wireprotocommand('known', 'nodes *')
965 @wireprotocommand('known', 'nodes *')
966 def known(repo, proto, nodes, others):
966 def known(repo, proto, nodes, others):
967 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
967 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
968
968
969 @wireprotocommand('pushkey', 'namespace key old new')
969 @wireprotocommand('pushkey', 'namespace key old new')
970 def pushkey(repo, proto, namespace, key, old, new):
970 def pushkey(repo, proto, namespace, key, old, new):
971 # compatibility with pre-1.8 clients which were accidentally
971 # compatibility with pre-1.8 clients which were accidentally
972 # sending raw binary nodes rather than utf-8-encoded hex
972 # sending raw binary nodes rather than utf-8-encoded hex
973 if len(new) == 20 and util.escapestr(new) != new:
973 if len(new) == 20 and util.escapestr(new) != new:
974 # looks like it could be a binary node
974 # looks like it could be a binary node
975 try:
975 try:
976 new.decode('utf-8')
976 new.decode('utf-8')
977 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
977 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
978 except UnicodeDecodeError:
978 except UnicodeDecodeError:
979 pass # binary, leave unmodified
979 pass # binary, leave unmodified
980 else:
980 else:
981 new = encoding.tolocal(new) # normal path
981 new = encoding.tolocal(new) # normal path
982
982
983 with proto.mayberedirectstdio() as output:
983 with proto.mayberedirectstdio() as output:
984 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
984 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
985 encoding.tolocal(old), new) or False
985 encoding.tolocal(old), new) or False
986
986
987 output = output.getvalue() if output else ''
987 output = output.getvalue() if output else ''
988 return '%s\n%s' % (int(r), output)
988 return '%s\n%s' % (int(r), output)
989
989
990 @wireprotocommand('stream_out')
990 @wireprotocommand('stream_out')
991 def stream(repo, proto):
991 def stream(repo, proto):
992 '''If the server supports streaming clone, it advertises the "stream"
992 '''If the server supports streaming clone, it advertises the "stream"
993 capability with a value representing the version and flags of the repo
993 capability with a value representing the version and flags of the repo
994 it is serving. Client checks to see if it understands the format.
994 it is serving. Client checks to see if it understands the format.
995 '''
995 '''
996 return streamres_legacy(streamclone.generatev1wireproto(repo))
996 return streamres_legacy(streamclone.generatev1wireproto(repo))
997
997
998 @wireprotocommand('unbundle', 'heads')
998 @wireprotocommand('unbundle', 'heads')
999 def unbundle(repo, proto, heads):
999 def unbundle(repo, proto, heads):
1000 their_heads = decodelist(heads)
1000 their_heads = decodelist(heads)
1001
1001
1002 with proto.mayberedirectstdio() as output:
1002 with proto.mayberedirectstdio() as output:
1003 try:
1003 try:
1004 exchange.check_heads(repo, their_heads, 'preparing changes')
1004 exchange.check_heads(repo, their_heads, 'preparing changes')
1005
1005
1006 # write bundle data to temporary file because it can be big
1006 # write bundle data to temporary file because it can be big
1007 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1007 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1008 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1008 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1009 r = 0
1009 r = 0
1010 try:
1010 try:
1011 proto.getfile(fp)
1011 proto.forwardpayload(fp)
1012 fp.seek(0)
1012 fp.seek(0)
1013 gen = exchange.readbundle(repo.ui, fp, None)
1013 gen = exchange.readbundle(repo.ui, fp, None)
1014 if (isinstance(gen, changegroupmod.cg1unpacker)
1014 if (isinstance(gen, changegroupmod.cg1unpacker)
1015 and not bundle1allowed(repo, 'push')):
1015 and not bundle1allowed(repo, 'push')):
1016 if proto.name == 'http':
1016 if proto.name == 'http':
1017 # need to special case http because stderr do not get to
1017 # need to special case http because stderr do not get to
1018 # the http client on failed push so we need to abuse
1018 # the http client on failed push so we need to abuse
1019 # some other error type to make sure the message get to
1019 # some other error type to make sure the message get to
1020 # the user.
1020 # the user.
1021 return ooberror(bundle2required)
1021 return ooberror(bundle2required)
1022 raise error.Abort(bundle2requiredmain,
1022 raise error.Abort(bundle2requiredmain,
1023 hint=bundle2requiredhint)
1023 hint=bundle2requiredhint)
1024
1024
1025 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1025 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1026 proto.client())
1026 proto.client())
1027 if util.safehasattr(r, 'addpart'):
1027 if util.safehasattr(r, 'addpart'):
1028 # The return looks streamable, we are in the bundle2 case
1028 # The return looks streamable, we are in the bundle2 case
1029 # and should return a stream.
1029 # and should return a stream.
1030 return streamres_legacy(gen=r.getchunks())
1030 return streamres_legacy(gen=r.getchunks())
1031 return pushres(r, output.getvalue() if output else '')
1031 return pushres(r, output.getvalue() if output else '')
1032
1032
1033 finally:
1033 finally:
1034 fp.close()
1034 fp.close()
1035 os.unlink(tempname)
1035 os.unlink(tempname)
1036
1036
1037 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1037 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1038 # handle non-bundle2 case first
1038 # handle non-bundle2 case first
1039 if not getattr(exc, 'duringunbundle2', False):
1039 if not getattr(exc, 'duringunbundle2', False):
1040 try:
1040 try:
1041 raise
1041 raise
1042 except error.Abort:
1042 except error.Abort:
1043 # The old code we moved used util.stderr directly.
1043 # The old code we moved used util.stderr directly.
1044 # We did not change it to minimise code change.
1044 # We did not change it to minimise code change.
1045 # This need to be moved to something proper.
1045 # This need to be moved to something proper.
1046 # Feel free to do it.
1046 # Feel free to do it.
1047 util.stderr.write("abort: %s\n" % exc)
1047 util.stderr.write("abort: %s\n" % exc)
1048 if exc.hint is not None:
1048 if exc.hint is not None:
1049 util.stderr.write("(%s)\n" % exc.hint)
1049 util.stderr.write("(%s)\n" % exc.hint)
1050 return pushres(0, output.getvalue() if output else '')
1050 return pushres(0, output.getvalue() if output else '')
1051 except error.PushRaced:
1051 except error.PushRaced:
1052 return pusherr(str(exc),
1052 return pusherr(str(exc),
1053 output.getvalue() if output else '')
1053 output.getvalue() if output else '')
1054
1054
1055 bundler = bundle2.bundle20(repo.ui)
1055 bundler = bundle2.bundle20(repo.ui)
1056 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1056 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1057 bundler.addpart(out)
1057 bundler.addpart(out)
1058 try:
1058 try:
1059 try:
1059 try:
1060 raise
1060 raise
1061 except error.PushkeyFailed as exc:
1061 except error.PushkeyFailed as exc:
1062 # check client caps
1062 # check client caps
1063 remotecaps = getattr(exc, '_replycaps', None)
1063 remotecaps = getattr(exc, '_replycaps', None)
1064 if (remotecaps is not None
1064 if (remotecaps is not None
1065 and 'pushkey' not in remotecaps.get('error', ())):
1065 and 'pushkey' not in remotecaps.get('error', ())):
1066 # no support remote side, fallback to Abort handler.
1066 # no support remote side, fallback to Abort handler.
1067 raise
1067 raise
1068 part = bundler.newpart('error:pushkey')
1068 part = bundler.newpart('error:pushkey')
1069 part.addparam('in-reply-to', exc.partid)
1069 part.addparam('in-reply-to', exc.partid)
1070 if exc.namespace is not None:
1070 if exc.namespace is not None:
1071 part.addparam('namespace', exc.namespace,
1071 part.addparam('namespace', exc.namespace,
1072 mandatory=False)
1072 mandatory=False)
1073 if exc.key is not None:
1073 if exc.key is not None:
1074 part.addparam('key', exc.key, mandatory=False)
1074 part.addparam('key', exc.key, mandatory=False)
1075 if exc.new is not None:
1075 if exc.new is not None:
1076 part.addparam('new', exc.new, mandatory=False)
1076 part.addparam('new', exc.new, mandatory=False)
1077 if exc.old is not None:
1077 if exc.old is not None:
1078 part.addparam('old', exc.old, mandatory=False)
1078 part.addparam('old', exc.old, mandatory=False)
1079 if exc.ret is not None:
1079 if exc.ret is not None:
1080 part.addparam('ret', exc.ret, mandatory=False)
1080 part.addparam('ret', exc.ret, mandatory=False)
1081 except error.BundleValueError as exc:
1081 except error.BundleValueError as exc:
1082 errpart = bundler.newpart('error:unsupportedcontent')
1082 errpart = bundler.newpart('error:unsupportedcontent')
1083 if exc.parttype is not None:
1083 if exc.parttype is not None:
1084 errpart.addparam('parttype', exc.parttype)
1084 errpart.addparam('parttype', exc.parttype)
1085 if exc.params:
1085 if exc.params:
1086 errpart.addparam('params', '\0'.join(exc.params))
1086 errpart.addparam('params', '\0'.join(exc.params))
1087 except error.Abort as exc:
1087 except error.Abort as exc:
1088 manargs = [('message', str(exc))]
1088 manargs = [('message', str(exc))]
1089 advargs = []
1089 advargs = []
1090 if exc.hint is not None:
1090 if exc.hint is not None:
1091 advargs.append(('hint', exc.hint))
1091 advargs.append(('hint', exc.hint))
1092 bundler.addpart(bundle2.bundlepart('error:abort',
1092 bundler.addpart(bundle2.bundlepart('error:abort',
1093 manargs, advargs))
1093 manargs, advargs))
1094 except error.PushRaced as exc:
1094 except error.PushRaced as exc:
1095 bundler.newpart('error:pushraced', [('message', str(exc))])
1095 bundler.newpart('error:pushraced', [('message', str(exc))])
1096 return streamres_legacy(gen=bundler.getchunks())
1096 return streamres_legacy(gen=bundler.getchunks())
@@ -1,454 +1,455 b''
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import abc
9 import abc
10 import cgi
10 import cgi
11 import contextlib
11 import contextlib
12 import struct
12 import struct
13 import sys
13 import sys
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 encoding,
17 encoding,
18 error,
18 error,
19 hook,
19 hook,
20 pycompat,
20 pycompat,
21 util,
21 util,
22 wireproto,
22 wireproto,
23 )
23 )
24
24
25 stringio = util.stringio
25 stringio = util.stringio
26
26
27 urlerr = util.urlerr
27 urlerr = util.urlerr
28 urlreq = util.urlreq
28 urlreq = util.urlreq
29
29
30 HTTP_OK = 200
30 HTTP_OK = 200
31
31
32 HGTYPE = 'application/mercurial-0.1'
32 HGTYPE = 'application/mercurial-0.1'
33 HGTYPE2 = 'application/mercurial-0.2'
33 HGTYPE2 = 'application/mercurial-0.2'
34 HGERRTYPE = 'application/hg-error'
34 HGERRTYPE = 'application/hg-error'
35
35
36 # Names of the SSH protocol implementations.
36 # Names of the SSH protocol implementations.
37 SSHV1 = 'ssh-v1'
37 SSHV1 = 'ssh-v1'
38 # This is advertised over the wire. Incremental the counter at the end
38 # This is advertised over the wire. Incremental the counter at the end
39 # to reflect BC breakages.
39 # to reflect BC breakages.
40 SSHV2 = 'exp-ssh-v2-0001'
40 SSHV2 = 'exp-ssh-v2-0001'
41
41
42 class baseprotocolhandler(object):
42 class baseprotocolhandler(object):
43 """Abstract base class for wire protocol handlers.
43 """Abstract base class for wire protocol handlers.
44
44
45 A wire protocol handler serves as an interface between protocol command
45 A wire protocol handler serves as an interface between protocol command
46 handlers and the wire protocol transport layer. Protocol handlers provide
46 handlers and the wire protocol transport layer. Protocol handlers provide
47 methods to read command arguments, redirect stdio for the duration of
47 methods to read command arguments, redirect stdio for the duration of
48 the request, handle response types, etc.
48 the request, handle response types, etc.
49 """
49 """
50
50
51 __metaclass__ = abc.ABCMeta
51 __metaclass__ = abc.ABCMeta
52
52
53 @abc.abstractproperty
53 @abc.abstractproperty
54 def name(self):
54 def name(self):
55 """The name of the protocol implementation.
55 """The name of the protocol implementation.
56
56
57 Used for uniquely identifying the transport type.
57 Used for uniquely identifying the transport type.
58 """
58 """
59
59
60 @abc.abstractmethod
60 @abc.abstractmethod
61 def getargs(self, args):
61 def getargs(self, args):
62 """return the value for arguments in <args>
62 """return the value for arguments in <args>
63
63
64 returns a list of values (same order as <args>)"""
64 returns a list of values (same order as <args>)"""
65
65
66 @abc.abstractmethod
66 @abc.abstractmethod
67 def getfile(self, fp):
67 def forwardpayload(self, fp):
68 """write the whole content of a file into a file like object
68 """Read the raw payload and forward to a file.
69
69
70 The file is in the form::
70 The payload is read in full before the function returns.
71
72 (<chunk-size>\n<chunk>)+0\n
73
74 chunk size is the ascii version of the int.
75 """
71 """
76
72
77 @abc.abstractmethod
73 @abc.abstractmethod
78 def mayberedirectstdio(self):
74 def mayberedirectstdio(self):
79 """Context manager to possibly redirect stdio.
75 """Context manager to possibly redirect stdio.
80
76
81 The context manager yields a file-object like object that receives
77 The context manager yields a file-object like object that receives
82 stdout and stderr output when the context manager is active. Or it
78 stdout and stderr output when the context manager is active. Or it
83 yields ``None`` if no I/O redirection occurs.
79 yields ``None`` if no I/O redirection occurs.
84
80
85 The intent of this context manager is to capture stdio output
81 The intent of this context manager is to capture stdio output
86 so it may be sent in the response. Some transports support streaming
82 so it may be sent in the response. Some transports support streaming
87 stdio to the client in real time. For these transports, stdio output
83 stdio to the client in real time. For these transports, stdio output
88 won't be captured.
84 won't be captured.
89 """
85 """
90
86
91 @abc.abstractmethod
87 @abc.abstractmethod
92 def client(self):
88 def client(self):
93 """Returns a string representation of this client (as bytes)."""
89 """Returns a string representation of this client (as bytes)."""
94
90
95 def decodevaluefromheaders(req, headerprefix):
91 def decodevaluefromheaders(req, headerprefix):
96 """Decode a long value from multiple HTTP request headers.
92 """Decode a long value from multiple HTTP request headers.
97
93
98 Returns the value as a bytes, not a str.
94 Returns the value as a bytes, not a str.
99 """
95 """
100 chunks = []
96 chunks = []
101 i = 1
97 i = 1
102 prefix = headerprefix.upper().replace(r'-', r'_')
98 prefix = headerprefix.upper().replace(r'-', r'_')
103 while True:
99 while True:
104 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
100 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
105 if v is None:
101 if v is None:
106 break
102 break
107 chunks.append(pycompat.bytesurl(v))
103 chunks.append(pycompat.bytesurl(v))
108 i += 1
104 i += 1
109
105
110 return ''.join(chunks)
106 return ''.join(chunks)
111
107
112 class webproto(baseprotocolhandler):
108 class webproto(baseprotocolhandler):
113 def __init__(self, req, ui):
109 def __init__(self, req, ui):
114 self._req = req
110 self._req = req
115 self._ui = ui
111 self._ui = ui
116
112
117 @property
113 @property
118 def name(self):
114 def name(self):
119 return 'http'
115 return 'http'
120
116
121 def getargs(self, args):
117 def getargs(self, args):
122 knownargs = self._args()
118 knownargs = self._args()
123 data = {}
119 data = {}
124 keys = args.split()
120 keys = args.split()
125 for k in keys:
121 for k in keys:
126 if k == '*':
122 if k == '*':
127 star = {}
123 star = {}
128 for key in knownargs.keys():
124 for key in knownargs.keys():
129 if key != 'cmd' and key not in keys:
125 if key != 'cmd' and key not in keys:
130 star[key] = knownargs[key][0]
126 star[key] = knownargs[key][0]
131 data['*'] = star
127 data['*'] = star
132 else:
128 else:
133 data[k] = knownargs[k][0]
129 data[k] = knownargs[k][0]
134 return [data[k] for k in keys]
130 return [data[k] for k in keys]
135
131
136 def _args(self):
132 def _args(self):
137 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
133 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
138 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
134 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
139 if postlen:
135 if postlen:
140 args.update(cgi.parse_qs(
136 args.update(cgi.parse_qs(
141 self._req.read(postlen), keep_blank_values=True))
137 self._req.read(postlen), keep_blank_values=True))
142 return args
138 return args
143
139
144 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
140 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
145 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
141 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
146 return args
142 return args
147
143
148 def getfile(self, fp):
144 def forwardpayload(self, fp):
149 length = int(self._req.env[r'CONTENT_LENGTH'])
145 length = int(self._req.env[r'CONTENT_LENGTH'])
150 # If httppostargs is used, we need to read Content-Length
146 # If httppostargs is used, we need to read Content-Length
151 # minus the amount that was consumed by args.
147 # minus the amount that was consumed by args.
152 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
148 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
153 for s in util.filechunkiter(self._req, limit=length):
149 for s in util.filechunkiter(self._req, limit=length):
154 fp.write(s)
150 fp.write(s)
155
151
156 @contextlib.contextmanager
152 @contextlib.contextmanager
157 def mayberedirectstdio(self):
153 def mayberedirectstdio(self):
158 oldout = self._ui.fout
154 oldout = self._ui.fout
159 olderr = self._ui.ferr
155 olderr = self._ui.ferr
160
156
161 out = util.stringio()
157 out = util.stringio()
162
158
163 try:
159 try:
164 self._ui.fout = out
160 self._ui.fout = out
165 self._ui.ferr = out
161 self._ui.ferr = out
166 yield out
162 yield out
167 finally:
163 finally:
168 self._ui.fout = oldout
164 self._ui.fout = oldout
169 self._ui.ferr = olderr
165 self._ui.ferr = olderr
170
166
171 def client(self):
167 def client(self):
172 return 'remote:%s:%s:%s' % (
168 return 'remote:%s:%s:%s' % (
173 self._req.env.get('wsgi.url_scheme') or 'http',
169 self._req.env.get('wsgi.url_scheme') or 'http',
174 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
170 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
175 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
171 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
176
172
177 def responsetype(self, prefer_uncompressed):
173 def responsetype(self, prefer_uncompressed):
178 """Determine the appropriate response type and compression settings.
174 """Determine the appropriate response type and compression settings.
179
175
180 Returns a tuple of (mediatype, compengine, engineopts).
176 Returns a tuple of (mediatype, compengine, engineopts).
181 """
177 """
182 # Determine the response media type and compression engine based
178 # Determine the response media type and compression engine based
183 # on the request parameters.
179 # on the request parameters.
184 protocaps = decodevaluefromheaders(self._req, r'X-HgProto').split(' ')
180 protocaps = decodevaluefromheaders(self._req, r'X-HgProto').split(' ')
185
181
186 if '0.2' in protocaps:
182 if '0.2' in protocaps:
187 # All clients are expected to support uncompressed data.
183 # All clients are expected to support uncompressed data.
188 if prefer_uncompressed:
184 if prefer_uncompressed:
189 return HGTYPE2, util._noopengine(), {}
185 return HGTYPE2, util._noopengine(), {}
190
186
191 # Default as defined by wire protocol spec.
187 # Default as defined by wire protocol spec.
192 compformats = ['zlib', 'none']
188 compformats = ['zlib', 'none']
193 for cap in protocaps:
189 for cap in protocaps:
194 if cap.startswith('comp='):
190 if cap.startswith('comp='):
195 compformats = cap[5:].split(',')
191 compformats = cap[5:].split(',')
196 break
192 break
197
193
198 # Now find an agreed upon compression format.
194 # Now find an agreed upon compression format.
199 for engine in wireproto.supportedcompengines(self._ui, self,
195 for engine in wireproto.supportedcompengines(self._ui, self,
200 util.SERVERROLE):
196 util.SERVERROLE):
201 if engine.wireprotosupport().name in compformats:
197 if engine.wireprotosupport().name in compformats:
202 opts = {}
198 opts = {}
203 level = self._ui.configint('server',
199 level = self._ui.configint('server',
204 '%slevel' % engine.name())
200 '%slevel' % engine.name())
205 if level is not None:
201 if level is not None:
206 opts['level'] = level
202 opts['level'] = level
207
203
208 return HGTYPE2, engine, opts
204 return HGTYPE2, engine, opts
209
205
210 # No mutually supported compression format. Fall back to the
206 # No mutually supported compression format. Fall back to the
211 # legacy protocol.
207 # legacy protocol.
212
208
213 # Don't allow untrusted settings because disabling compression or
209 # Don't allow untrusted settings because disabling compression or
214 # setting a very high compression level could lead to flooding
210 # setting a very high compression level could lead to flooding
215 # the server's network or CPU.
211 # the server's network or CPU.
216 opts = {'level': self._ui.configint('server', 'zliblevel')}
212 opts = {'level': self._ui.configint('server', 'zliblevel')}
217 return HGTYPE, util.compengines['zlib'], opts
213 return HGTYPE, util.compengines['zlib'], opts
218
214
219 def iscmd(cmd):
215 def iscmd(cmd):
220 return cmd in wireproto.commands
216 return cmd in wireproto.commands
221
217
222 def parsehttprequest(repo, req, query):
218 def parsehttprequest(repo, req, query):
223 """Parse the HTTP request for a wire protocol request.
219 """Parse the HTTP request for a wire protocol request.
224
220
225 If the current request appears to be a wire protocol request, this
221 If the current request appears to be a wire protocol request, this
226 function returns a dict with details about that request, including
222 function returns a dict with details about that request, including
227 an ``abstractprotocolserver`` instance suitable for handling the
223 an ``abstractprotocolserver`` instance suitable for handling the
228 request. Otherwise, ``None`` is returned.
224 request. Otherwise, ``None`` is returned.
229
225
230 ``req`` is a ``wsgirequest`` instance.
226 ``req`` is a ``wsgirequest`` instance.
231 """
227 """
232 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
228 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
233 # string parameter. If it isn't present, this isn't a wire protocol
229 # string parameter. If it isn't present, this isn't a wire protocol
234 # request.
230 # request.
235 if r'cmd' not in req.form:
231 if r'cmd' not in req.form:
236 return None
232 return None
237
233
238 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
234 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
239
235
240 # The "cmd" request parameter is used by both the wire protocol and hgweb.
236 # The "cmd" request parameter is used by both the wire protocol and hgweb.
241 # While not all wire protocol commands are available for all transports,
237 # While not all wire protocol commands are available for all transports,
242 # if we see a "cmd" value that resembles a known wire protocol command, we
238 # if we see a "cmd" value that resembles a known wire protocol command, we
243 # route it to a protocol handler. This is better than routing possible
239 # route it to a protocol handler. This is better than routing possible
244 # wire protocol requests to hgweb because it prevents hgweb from using
240 # wire protocol requests to hgweb because it prevents hgweb from using
245 # known wire protocol commands and it is less confusing for machine
241 # known wire protocol commands and it is less confusing for machine
246 # clients.
242 # clients.
247 if cmd not in wireproto.commands:
243 if cmd not in wireproto.commands:
248 return None
244 return None
249
245
250 proto = webproto(req, repo.ui)
246 proto = webproto(req, repo.ui)
251
247
252 return {
248 return {
253 'cmd': cmd,
249 'cmd': cmd,
254 'proto': proto,
250 'proto': proto,
255 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
251 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
256 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
252 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
257 }
253 }
258
254
259 def _callhttp(repo, req, proto, cmd):
255 def _callhttp(repo, req, proto, cmd):
260 def genversion2(gen, engine, engineopts):
256 def genversion2(gen, engine, engineopts):
261 # application/mercurial-0.2 always sends a payload header
257 # application/mercurial-0.2 always sends a payload header
262 # identifying the compression engine.
258 # identifying the compression engine.
263 name = engine.wireprotosupport().name
259 name = engine.wireprotosupport().name
264 assert 0 < len(name) < 256
260 assert 0 < len(name) < 256
265 yield struct.pack('B', len(name))
261 yield struct.pack('B', len(name))
266 yield name
262 yield name
267
263
268 for chunk in gen:
264 for chunk in gen:
269 yield chunk
265 yield chunk
270
266
271 rsp = wireproto.dispatch(repo, proto, cmd)
267 rsp = wireproto.dispatch(repo, proto, cmd)
272
268
273 if not wireproto.commands.commandavailable(cmd, proto):
269 if not wireproto.commands.commandavailable(cmd, proto):
274 req.respond(HTTP_OK, HGERRTYPE,
270 req.respond(HTTP_OK, HGERRTYPE,
275 body=_('requested wire protocol command is not available '
271 body=_('requested wire protocol command is not available '
276 'over HTTP'))
272 'over HTTP'))
277 return []
273 return []
278
274
279 if isinstance(rsp, bytes):
275 if isinstance(rsp, bytes):
280 req.respond(HTTP_OK, HGTYPE, body=rsp)
276 req.respond(HTTP_OK, HGTYPE, body=rsp)
281 return []
277 return []
282 elif isinstance(rsp, wireproto.streamres_legacy):
278 elif isinstance(rsp, wireproto.streamres_legacy):
283 gen = rsp.gen
279 gen = rsp.gen
284 req.respond(HTTP_OK, HGTYPE)
280 req.respond(HTTP_OK, HGTYPE)
285 return gen
281 return gen
286 elif isinstance(rsp, wireproto.streamres):
282 elif isinstance(rsp, wireproto.streamres):
287 gen = rsp.gen
283 gen = rsp.gen
288
284
289 # This code for compression should not be streamres specific. It
285 # This code for compression should not be streamres specific. It
290 # is here because we only compress streamres at the moment.
286 # is here because we only compress streamres at the moment.
291 mediatype, engine, engineopts = proto.responsetype(
287 mediatype, engine, engineopts = proto.responsetype(
292 rsp.prefer_uncompressed)
288 rsp.prefer_uncompressed)
293 gen = engine.compressstream(gen, engineopts)
289 gen = engine.compressstream(gen, engineopts)
294
290
295 if mediatype == HGTYPE2:
291 if mediatype == HGTYPE2:
296 gen = genversion2(gen, engine, engineopts)
292 gen = genversion2(gen, engine, engineopts)
297
293
298 req.respond(HTTP_OK, mediatype)
294 req.respond(HTTP_OK, mediatype)
299 return gen
295 return gen
300 elif isinstance(rsp, wireproto.pushres):
296 elif isinstance(rsp, wireproto.pushres):
301 rsp = '%d\n%s' % (rsp.res, rsp.output)
297 rsp = '%d\n%s' % (rsp.res, rsp.output)
302 req.respond(HTTP_OK, HGTYPE, body=rsp)
298 req.respond(HTTP_OK, HGTYPE, body=rsp)
303 return []
299 return []
304 elif isinstance(rsp, wireproto.pusherr):
300 elif isinstance(rsp, wireproto.pusherr):
305 # This is the httplib workaround documented in _handlehttperror().
301 # This is the httplib workaround documented in _handlehttperror().
306 req.drain()
302 req.drain()
307
303
308 rsp = '0\n%s\n' % rsp.res
304 rsp = '0\n%s\n' % rsp.res
309 req.respond(HTTP_OK, HGTYPE, body=rsp)
305 req.respond(HTTP_OK, HGTYPE, body=rsp)
310 return []
306 return []
311 elif isinstance(rsp, wireproto.ooberror):
307 elif isinstance(rsp, wireproto.ooberror):
312 rsp = rsp.message
308 rsp = rsp.message
313 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
309 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
314 return []
310 return []
315 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
311 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
316
312
317 def _handlehttperror(e, req, cmd):
313 def _handlehttperror(e, req, cmd):
318 """Called when an ErrorResponse is raised during HTTP request processing."""
314 """Called when an ErrorResponse is raised during HTTP request processing."""
319
315
320 # Clients using Python's httplib are stateful: the HTTP client
316 # Clients using Python's httplib are stateful: the HTTP client
321 # won't process an HTTP response until all request data is
317 # won't process an HTTP response until all request data is
322 # sent to the server. The intent of this code is to ensure
318 # sent to the server. The intent of this code is to ensure
323 # we always read HTTP request data from the client, thus
319 # we always read HTTP request data from the client, thus
324 # ensuring httplib transitions to a state that allows it to read
320 # ensuring httplib transitions to a state that allows it to read
325 # the HTTP response. In other words, it helps prevent deadlocks
321 # the HTTP response. In other words, it helps prevent deadlocks
326 # on clients using httplib.
322 # on clients using httplib.
327
323
328 if (req.env[r'REQUEST_METHOD'] == r'POST' and
324 if (req.env[r'REQUEST_METHOD'] == r'POST' and
329 # But not if Expect: 100-continue is being used.
325 # But not if Expect: 100-continue is being used.
330 (req.env.get('HTTP_EXPECT',
326 (req.env.get('HTTP_EXPECT',
331 '').lower() != '100-continue') or
327 '').lower() != '100-continue') or
332 # Or the non-httplib HTTP library is being advertised by
328 # Or the non-httplib HTTP library is being advertised by
333 # the client.
329 # the client.
334 req.env.get('X-HgHttp2', '')):
330 req.env.get('X-HgHttp2', '')):
335 req.drain()
331 req.drain()
336 else:
332 else:
337 req.headers.append((r'Connection', r'Close'))
333 req.headers.append((r'Connection', r'Close'))
338
334
339 # TODO This response body assumes the failed command was
335 # TODO This response body assumes the failed command was
340 # "unbundle." That assumption is not always valid.
336 # "unbundle." That assumption is not always valid.
341 req.respond(e, HGTYPE, body='0\n%s\n' % e)
337 req.respond(e, HGTYPE, body='0\n%s\n' % e)
342
338
343 return ''
339 return ''
344
340
345 def _sshv1respondbytes(fout, value):
341 def _sshv1respondbytes(fout, value):
346 """Send a bytes response for protocol version 1."""
342 """Send a bytes response for protocol version 1."""
347 fout.write('%d\n' % len(value))
343 fout.write('%d\n' % len(value))
348 fout.write(value)
344 fout.write(value)
349 fout.flush()
345 fout.flush()
350
346
351 def _sshv1respondstream(fout, source):
347 def _sshv1respondstream(fout, source):
352 write = fout.write
348 write = fout.write
353 for chunk in source.gen:
349 for chunk in source.gen:
354 write(chunk)
350 write(chunk)
355 fout.flush()
351 fout.flush()
356
352
357 def _sshv1respondooberror(fout, ferr, rsp):
353 def _sshv1respondooberror(fout, ferr, rsp):
358 ferr.write(b'%s\n-\n' % rsp)
354 ferr.write(b'%s\n-\n' % rsp)
359 ferr.flush()
355 ferr.flush()
360 fout.write(b'\n')
356 fout.write(b'\n')
361 fout.flush()
357 fout.flush()
362
358
363 class sshv1protocolhandler(baseprotocolhandler):
359 class sshv1protocolhandler(baseprotocolhandler):
364 """Handler for requests services via version 1 of SSH protocol."""
360 """Handler for requests services via version 1 of SSH protocol."""
365 def __init__(self, ui, fin, fout):
361 def __init__(self, ui, fin, fout):
366 self._ui = ui
362 self._ui = ui
367 self._fin = fin
363 self._fin = fin
368 self._fout = fout
364 self._fout = fout
369
365
370 @property
366 @property
371 def name(self):
367 def name(self):
372 return 'ssh'
368 return 'ssh'
373
369
374 def getargs(self, args):
370 def getargs(self, args):
375 data = {}
371 data = {}
376 keys = args.split()
372 keys = args.split()
377 for n in xrange(len(keys)):
373 for n in xrange(len(keys)):
378 argline = self._fin.readline()[:-1]
374 argline = self._fin.readline()[:-1]
379 arg, l = argline.split()
375 arg, l = argline.split()
380 if arg not in keys:
376 if arg not in keys:
381 raise error.Abort(_("unexpected parameter %r") % arg)
377 raise error.Abort(_("unexpected parameter %r") % arg)
382 if arg == '*':
378 if arg == '*':
383 star = {}
379 star = {}
384 for k in xrange(int(l)):
380 for k in xrange(int(l)):
385 argline = self._fin.readline()[:-1]
381 argline = self._fin.readline()[:-1]
386 arg, l = argline.split()
382 arg, l = argline.split()
387 val = self._fin.read(int(l))
383 val = self._fin.read(int(l))
388 star[arg] = val
384 star[arg] = val
389 data['*'] = star
385 data['*'] = star
390 else:
386 else:
391 val = self._fin.read(int(l))
387 val = self._fin.read(int(l))
392 data[arg] = val
388 data[arg] = val
393 return [data[k] for k in keys]
389 return [data[k] for k in keys]
394
390
395 def getfile(self, fpout):
391 def forwardpayload(self, fpout):
392 # The file is in the form:
393 #
394 # <chunk size>\n<chunk>
395 # ...
396 # 0\n
396 _sshv1respondbytes(self._fout, b'')
397 _sshv1respondbytes(self._fout, b'')
397 count = int(self._fin.readline())
398 count = int(self._fin.readline())
398 while count:
399 while count:
399 fpout.write(self._fin.read(count))
400 fpout.write(self._fin.read(count))
400 count = int(self._fin.readline())
401 count = int(self._fin.readline())
401
402
402 @contextlib.contextmanager
403 @contextlib.contextmanager
403 def mayberedirectstdio(self):
404 def mayberedirectstdio(self):
404 yield None
405 yield None
405
406
406 def client(self):
407 def client(self):
407 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
408 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
408 return 'remote:ssh:' + client
409 return 'remote:ssh:' + client
409
410
410 class sshserver(object):
411 class sshserver(object):
411 def __init__(self, ui, repo):
412 def __init__(self, ui, repo):
412 self._ui = ui
413 self._ui = ui
413 self._repo = repo
414 self._repo = repo
414 self._fin = ui.fin
415 self._fin = ui.fin
415 self._fout = ui.fout
416 self._fout = ui.fout
416
417
417 hook.redirect(True)
418 hook.redirect(True)
418 ui.fout = repo.ui.fout = ui.ferr
419 ui.fout = repo.ui.fout = ui.ferr
419
420
420 # Prevent insertion/deletion of CRs
421 # Prevent insertion/deletion of CRs
421 util.setbinary(self._fin)
422 util.setbinary(self._fin)
422 util.setbinary(self._fout)
423 util.setbinary(self._fout)
423
424
424 self._proto = sshv1protocolhandler(self._ui, self._fin, self._fout)
425 self._proto = sshv1protocolhandler(self._ui, self._fin, self._fout)
425
426
426 def serve_forever(self):
427 def serve_forever(self):
427 while self.serve_one():
428 while self.serve_one():
428 pass
429 pass
429 sys.exit(0)
430 sys.exit(0)
430
431
431 def serve_one(self):
432 def serve_one(self):
432 cmd = self._fin.readline()[:-1]
433 cmd = self._fin.readline()[:-1]
433 if cmd and wireproto.commands.commandavailable(cmd, self._proto):
434 if cmd and wireproto.commands.commandavailable(cmd, self._proto):
434 rsp = wireproto.dispatch(self._repo, self._proto, cmd)
435 rsp = wireproto.dispatch(self._repo, self._proto, cmd)
435
436
436 if isinstance(rsp, bytes):
437 if isinstance(rsp, bytes):
437 _sshv1respondbytes(self._fout, rsp)
438 _sshv1respondbytes(self._fout, rsp)
438 elif isinstance(rsp, wireproto.streamres):
439 elif isinstance(rsp, wireproto.streamres):
439 _sshv1respondstream(self._fout, rsp)
440 _sshv1respondstream(self._fout, rsp)
440 elif isinstance(rsp, wireproto.streamres_legacy):
441 elif isinstance(rsp, wireproto.streamres_legacy):
441 _sshv1respondstream(self._fout, rsp)
442 _sshv1respondstream(self._fout, rsp)
442 elif isinstance(rsp, wireproto.pushres):
443 elif isinstance(rsp, wireproto.pushres):
443 _sshv1respondbytes(self._fout, b'')
444 _sshv1respondbytes(self._fout, b'')
444 _sshv1respondbytes(self._fout, bytes(rsp.res))
445 _sshv1respondbytes(self._fout, bytes(rsp.res))
445 elif isinstance(rsp, wireproto.pusherr):
446 elif isinstance(rsp, wireproto.pusherr):
446 _sshv1respondbytes(self._fout, rsp.res)
447 _sshv1respondbytes(self._fout, rsp.res)
447 elif isinstance(rsp, wireproto.ooberror):
448 elif isinstance(rsp, wireproto.ooberror):
448 _sshv1respondooberror(self._fout, self._ui.ferr, rsp.message)
449 _sshv1respondooberror(self._fout, self._ui.ferr, rsp.message)
449 else:
450 else:
450 raise error.ProgrammingError('unhandled response type from '
451 raise error.ProgrammingError('unhandled response type from '
451 'wire protocol command: %s' % rsp)
452 'wire protocol command: %s' % rsp)
452 elif cmd:
453 elif cmd:
453 _sshv1respondbytes(self._fout, b'')
454 _sshv1respondbytes(self._fout, b'')
454 return cmd != ''
455 return cmd != ''
General Comments 0
You need to be logged in to leave comments. Login now