##// END OF EJS Templates
wireproto: split streamres into legacy and modern case...
Joerg Sonnenberger -
r35768:a39a9df7 default
parent child Browse files
Show More
@@ -1,190 +1,190
1 # Copyright 2011 Fog Creek Software
1 # Copyright 2011 Fog Creek Software
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5 from __future__ import absolute_import
5 from __future__ import absolute_import
6
6
7 import os
7 import os
8 import re
8 import re
9
9
10 from mercurial.i18n import _
10 from mercurial.i18n import _
11
11
12 from mercurial import (
12 from mercurial import (
13 error,
13 error,
14 httppeer,
14 httppeer,
15 util,
15 util,
16 wireproto,
16 wireproto,
17 )
17 )
18
18
19 from . import (
19 from . import (
20 lfutil,
20 lfutil,
21 )
21 )
22
22
23 urlerr = util.urlerr
23 urlerr = util.urlerr
24 urlreq = util.urlreq
24 urlreq = util.urlreq
25
25
26 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
26 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
27 '\n\nPlease enable it in your Mercurial config '
27 '\n\nPlease enable it in your Mercurial config '
28 'file.\n')
28 'file.\n')
29
29
30 # these will all be replaced by largefiles.uisetup
30 # these will all be replaced by largefiles.uisetup
31 ssholdcallstream = None
31 ssholdcallstream = None
32 httpoldcallstream = None
32 httpoldcallstream = None
33
33
34 def putlfile(repo, proto, sha):
34 def putlfile(repo, proto, sha):
35 '''Server command for putting a largefile into a repository's local store
35 '''Server command for putting a largefile into a repository's local store
36 and into the user cache.'''
36 and into the user cache.'''
37 proto.redirect()
37 proto.redirect()
38
38
39 path = lfutil.storepath(repo, sha)
39 path = lfutil.storepath(repo, sha)
40 util.makedirs(os.path.dirname(path))
40 util.makedirs(os.path.dirname(path))
41 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
41 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
42
42
43 try:
43 try:
44 proto.getfile(tmpfp)
44 proto.getfile(tmpfp)
45 tmpfp._fp.seek(0)
45 tmpfp._fp.seek(0)
46 if sha != lfutil.hexsha1(tmpfp._fp):
46 if sha != lfutil.hexsha1(tmpfp._fp):
47 raise IOError(0, _('largefile contents do not match hash'))
47 raise IOError(0, _('largefile contents do not match hash'))
48 tmpfp.close()
48 tmpfp.close()
49 lfutil.linktousercache(repo, sha)
49 lfutil.linktousercache(repo, sha)
50 except IOError as e:
50 except IOError as e:
51 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
51 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
52 (sha, e.strerror))
52 (sha, e.strerror))
53 return wireproto.pushres(1)
53 return wireproto.pushres(1)
54 finally:
54 finally:
55 tmpfp.discard()
55 tmpfp.discard()
56
56
57 return wireproto.pushres(0)
57 return wireproto.pushres(0)
58
58
59 def getlfile(repo, proto, sha):
59 def getlfile(repo, proto, sha):
60 '''Server command for retrieving a largefile from the repository-local
60 '''Server command for retrieving a largefile from the repository-local
61 cache or user cache.'''
61 cache or user cache.'''
62 filename = lfutil.findfile(repo, sha)
62 filename = lfutil.findfile(repo, sha)
63 if not filename:
63 if not filename:
64 raise error.Abort(_('requested largefile %s not present in cache')
64 raise error.Abort(_('requested largefile %s not present in cache')
65 % sha)
65 % sha)
66 f = open(filename, 'rb')
66 f = open(filename, 'rb')
67 length = os.fstat(f.fileno())[6]
67 length = os.fstat(f.fileno())[6]
68
68
69 # Since we can't set an HTTP content-length header here, and
69 # Since we can't set an HTTP content-length header here, and
70 # Mercurial core provides no way to give the length of a streamres
70 # Mercurial core provides no way to give the length of a streamres
71 # (and reading the entire file into RAM would be ill-advised), we
71 # (and reading the entire file into RAM would be ill-advised), we
72 # just send the length on the first line of the response, like the
72 # just send the length on the first line of the response, like the
73 # ssh proto does for string responses.
73 # ssh proto does for string responses.
74 def generator():
74 def generator():
75 yield '%d\n' % length
75 yield '%d\n' % length
76 for chunk in util.filechunkiter(f):
76 for chunk in util.filechunkiter(f):
77 yield chunk
77 yield chunk
78 return wireproto.streamres(gen=generator())
78 return wireproto.streamres_legacy(gen=generator())
79
79
80 def statlfile(repo, proto, sha):
80 def statlfile(repo, proto, sha):
81 '''Server command for checking if a largefile is present - returns '2\n' if
81 '''Server command for checking if a largefile is present - returns '2\n' if
82 the largefile is missing, '0\n' if it seems to be in good condition.
82 the largefile is missing, '0\n' if it seems to be in good condition.
83
83
84 The value 1 is reserved for mismatched checksum, but that is too expensive
84 The value 1 is reserved for mismatched checksum, but that is too expensive
85 to be verified on every stat and must be caught be running 'hg verify'
85 to be verified on every stat and must be caught be running 'hg verify'
86 server side.'''
86 server side.'''
87 filename = lfutil.findfile(repo, sha)
87 filename = lfutil.findfile(repo, sha)
88 if not filename:
88 if not filename:
89 return '2\n'
89 return '2\n'
90 return '0\n'
90 return '0\n'
91
91
92 def wirereposetup(ui, repo):
92 def wirereposetup(ui, repo):
93 class lfileswirerepository(repo.__class__):
93 class lfileswirerepository(repo.__class__):
94 def putlfile(self, sha, fd):
94 def putlfile(self, sha, fd):
95 # unfortunately, httprepository._callpush tries to convert its
95 # unfortunately, httprepository._callpush tries to convert its
96 # input file-like into a bundle before sending it, so we can't use
96 # input file-like into a bundle before sending it, so we can't use
97 # it ...
97 # it ...
98 if issubclass(self.__class__, httppeer.httppeer):
98 if issubclass(self.__class__, httppeer.httppeer):
99 res = self._call('putlfile', data=fd, sha=sha,
99 res = self._call('putlfile', data=fd, sha=sha,
100 headers={'content-type':'application/mercurial-0.1'})
100 headers={'content-type':'application/mercurial-0.1'})
101 try:
101 try:
102 d, output = res.split('\n', 1)
102 d, output = res.split('\n', 1)
103 for l in output.splitlines(True):
103 for l in output.splitlines(True):
104 self.ui.warn(_('remote: '), l) # assume l ends with \n
104 self.ui.warn(_('remote: '), l) # assume l ends with \n
105 return int(d)
105 return int(d)
106 except ValueError:
106 except ValueError:
107 self.ui.warn(_('unexpected putlfile response: %r\n') % res)
107 self.ui.warn(_('unexpected putlfile response: %r\n') % res)
108 return 1
108 return 1
109 # ... but we can't use sshrepository._call because the data=
109 # ... but we can't use sshrepository._call because the data=
110 # argument won't get sent, and _callpush does exactly what we want
110 # argument won't get sent, and _callpush does exactly what we want
111 # in this case: send the data straight through
111 # in this case: send the data straight through
112 else:
112 else:
113 try:
113 try:
114 ret, output = self._callpush("putlfile", fd, sha=sha)
114 ret, output = self._callpush("putlfile", fd, sha=sha)
115 if ret == "":
115 if ret == "":
116 raise error.ResponseError(_('putlfile failed:'),
116 raise error.ResponseError(_('putlfile failed:'),
117 output)
117 output)
118 return int(ret)
118 return int(ret)
119 except IOError:
119 except IOError:
120 return 1
120 return 1
121 except ValueError:
121 except ValueError:
122 raise error.ResponseError(
122 raise error.ResponseError(
123 _('putlfile failed (unexpected response):'), ret)
123 _('putlfile failed (unexpected response):'), ret)
124
124
125 def getlfile(self, sha):
125 def getlfile(self, sha):
126 """returns an iterable with the chunks of the file with sha sha"""
126 """returns an iterable with the chunks of the file with sha sha"""
127 stream = self._callstream("getlfile", sha=sha)
127 stream = self._callstream("getlfile", sha=sha)
128 length = stream.readline()
128 length = stream.readline()
129 try:
129 try:
130 length = int(length)
130 length = int(length)
131 except ValueError:
131 except ValueError:
132 self._abort(error.ResponseError(_("unexpected response:"),
132 self._abort(error.ResponseError(_("unexpected response:"),
133 length))
133 length))
134
134
135 # SSH streams will block if reading more than length
135 # SSH streams will block if reading more than length
136 for chunk in util.filechunkiter(stream, limit=length):
136 for chunk in util.filechunkiter(stream, limit=length):
137 yield chunk
137 yield chunk
138 # HTTP streams must hit the end to process the last empty
138 # HTTP streams must hit the end to process the last empty
139 # chunk of Chunked-Encoding so the connection can be reused.
139 # chunk of Chunked-Encoding so the connection can be reused.
140 if issubclass(self.__class__, httppeer.httppeer):
140 if issubclass(self.__class__, httppeer.httppeer):
141 chunk = stream.read(1)
141 chunk = stream.read(1)
142 if chunk:
142 if chunk:
143 self._abort(error.ResponseError(_("unexpected response:"),
143 self._abort(error.ResponseError(_("unexpected response:"),
144 chunk))
144 chunk))
145
145
146 @wireproto.batchable
146 @wireproto.batchable
147 def statlfile(self, sha):
147 def statlfile(self, sha):
148 f = wireproto.future()
148 f = wireproto.future()
149 result = {'sha': sha}
149 result = {'sha': sha}
150 yield result, f
150 yield result, f
151 try:
151 try:
152 yield int(f.value)
152 yield int(f.value)
153 except (ValueError, urlerr.httperror):
153 except (ValueError, urlerr.httperror):
154 # If the server returns anything but an integer followed by a
154 # If the server returns anything but an integer followed by a
155 # newline, newline, it's not speaking our language; if we get
155 # newline, newline, it's not speaking our language; if we get
156 # an HTTP error, we can't be sure the largefile is present;
156 # an HTTP error, we can't be sure the largefile is present;
157 # either way, consider it missing.
157 # either way, consider it missing.
158 yield 2
158 yield 2
159
159
160 repo.__class__ = lfileswirerepository
160 repo.__class__ = lfileswirerepository
161
161
162 # advertise the largefiles=serve capability
162 # advertise the largefiles=serve capability
163 def _capabilities(orig, repo, proto):
163 def _capabilities(orig, repo, proto):
164 '''announce largefile server capability'''
164 '''announce largefile server capability'''
165 caps = orig(repo, proto)
165 caps = orig(repo, proto)
166 caps.append('largefiles=serve')
166 caps.append('largefiles=serve')
167 return caps
167 return caps
168
168
169 def heads(repo, proto):
169 def heads(repo, proto):
170 '''Wrap server command - largefile capable clients will know to call
170 '''Wrap server command - largefile capable clients will know to call
171 lheads instead'''
171 lheads instead'''
172 if lfutil.islfilesrepo(repo):
172 if lfutil.islfilesrepo(repo):
173 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
173 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
174 return wireproto.heads(repo, proto)
174 return wireproto.heads(repo, proto)
175
175
176 def sshrepocallstream(self, cmd, **args):
176 def sshrepocallstream(self, cmd, **args):
177 if cmd == 'heads' and self.capable('largefiles'):
177 if cmd == 'heads' and self.capable('largefiles'):
178 cmd = 'lheads'
178 cmd = 'lheads'
179 if cmd == 'batch' and self.capable('largefiles'):
179 if cmd == 'batch' and self.capable('largefiles'):
180 args[r'cmds'] = args[r'cmds'].replace('heads ', 'lheads ')
180 args[r'cmds'] = args[r'cmds'].replace('heads ', 'lheads ')
181 return ssholdcallstream(self, cmd, **args)
181 return ssholdcallstream(self, cmd, **args)
182
182
183 headsre = re.compile(r'(^|;)heads\b')
183 headsre = re.compile(r'(^|;)heads\b')
184
184
185 def httprepocallstream(self, cmd, **args):
185 def httprepocallstream(self, cmd, **args):
186 if cmd == 'heads' and self.capable('largefiles'):
186 if cmd == 'heads' and self.capable('largefiles'):
187 cmd = 'lheads'
187 cmd = 'lheads'
188 if cmd == 'batch' and self.capable('largefiles'):
188 if cmd == 'batch' and self.capable('largefiles'):
189 args[r'cmds'] = headsre.sub('lheads', args[r'cmds'])
189 args[r'cmds'] = headsre.sub('lheads', args[r'cmds'])
190 return httpoldcallstream(self, cmd, **args)
190 return httpoldcallstream(self, cmd, **args)
@@ -1,207 +1,201
1 #
1 #
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import cgi
10 import cgi
11 import struct
11 import struct
12
12
13 from .common import (
13 from .common import (
14 HTTP_OK,
14 HTTP_OK,
15 )
15 )
16
16
17 from .. import (
17 from .. import (
18 error,
18 error,
19 pycompat,
19 pycompat,
20 util,
20 util,
21 wireproto,
21 wireproto,
22 )
22 )
23 stringio = util.stringio
23 stringio = util.stringio
24
24
25 urlerr = util.urlerr
25 urlerr = util.urlerr
26 urlreq = util.urlreq
26 urlreq = util.urlreq
27
27
28 HGTYPE = 'application/mercurial-0.1'
28 HGTYPE = 'application/mercurial-0.1'
29 HGTYPE2 = 'application/mercurial-0.2'
29 HGTYPE2 = 'application/mercurial-0.2'
30 HGERRTYPE = 'application/hg-error'
30 HGERRTYPE = 'application/hg-error'
31
31
32 def decodevaluefromheaders(req, headerprefix):
32 def decodevaluefromheaders(req, headerprefix):
33 """Decode a long value from multiple HTTP request headers.
33 """Decode a long value from multiple HTTP request headers.
34
34
35 Returns the value as a bytes, not a str.
35 Returns the value as a bytes, not a str.
36 """
36 """
37 chunks = []
37 chunks = []
38 i = 1
38 i = 1
39 prefix = headerprefix.upper().replace(r'-', r'_')
39 prefix = headerprefix.upper().replace(r'-', r'_')
40 while True:
40 while True:
41 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
41 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
42 if v is None:
42 if v is None:
43 break
43 break
44 chunks.append(pycompat.bytesurl(v))
44 chunks.append(pycompat.bytesurl(v))
45 i += 1
45 i += 1
46
46
47 return ''.join(chunks)
47 return ''.join(chunks)
48
48
49 class webproto(wireproto.abstractserverproto):
49 class webproto(wireproto.abstractserverproto):
50 def __init__(self, req, ui):
50 def __init__(self, req, ui):
51 self.req = req
51 self.req = req
52 self.response = ''
52 self.response = ''
53 self.ui = ui
53 self.ui = ui
54 self.name = 'http'
54 self.name = 'http'
55
55
56 def getargs(self, args):
56 def getargs(self, args):
57 knownargs = self._args()
57 knownargs = self._args()
58 data = {}
58 data = {}
59 keys = args.split()
59 keys = args.split()
60 for k in keys:
60 for k in keys:
61 if k == '*':
61 if k == '*':
62 star = {}
62 star = {}
63 for key in knownargs.keys():
63 for key in knownargs.keys():
64 if key != 'cmd' and key not in keys:
64 if key != 'cmd' and key not in keys:
65 star[key] = knownargs[key][0]
65 star[key] = knownargs[key][0]
66 data['*'] = star
66 data['*'] = star
67 else:
67 else:
68 data[k] = knownargs[k][0]
68 data[k] = knownargs[k][0]
69 return [data[k] for k in keys]
69 return [data[k] for k in keys]
70 def _args(self):
70 def _args(self):
71 args = self.req.form.copy()
71 args = self.req.form.copy()
72 if pycompat.ispy3:
72 if pycompat.ispy3:
73 args = {k.encode('ascii'): [v.encode('ascii') for v in vs]
73 args = {k.encode('ascii'): [v.encode('ascii') for v in vs]
74 for k, vs in args.items()}
74 for k, vs in args.items()}
75 postlen = int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
75 postlen = int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
76 if postlen:
76 if postlen:
77 args.update(cgi.parse_qs(
77 args.update(cgi.parse_qs(
78 self.req.read(postlen), keep_blank_values=True))
78 self.req.read(postlen), keep_blank_values=True))
79 return args
79 return args
80
80
81 argvalue = decodevaluefromheaders(self.req, r'X-HgArg')
81 argvalue = decodevaluefromheaders(self.req, r'X-HgArg')
82 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
82 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
83 return args
83 return args
84 def getfile(self, fp):
84 def getfile(self, fp):
85 length = int(self.req.env[r'CONTENT_LENGTH'])
85 length = int(self.req.env[r'CONTENT_LENGTH'])
86 # If httppostargs is used, we need to read Content-Length
86 # If httppostargs is used, we need to read Content-Length
87 # minus the amount that was consumed by args.
87 # minus the amount that was consumed by args.
88 length -= int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
88 length -= int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
89 for s in util.filechunkiter(self.req, limit=length):
89 for s in util.filechunkiter(self.req, limit=length):
90 fp.write(s)
90 fp.write(s)
91 def redirect(self):
91 def redirect(self):
92 self.oldio = self.ui.fout, self.ui.ferr
92 self.oldio = self.ui.fout, self.ui.ferr
93 self.ui.ferr = self.ui.fout = stringio()
93 self.ui.ferr = self.ui.fout = stringio()
94 def restore(self):
94 def restore(self):
95 val = self.ui.fout.getvalue()
95 val = self.ui.fout.getvalue()
96 self.ui.ferr, self.ui.fout = self.oldio
96 self.ui.ferr, self.ui.fout = self.oldio
97 return val
97 return val
98
98
99 def _client(self):
99 def _client(self):
100 return 'remote:%s:%s:%s' % (
100 return 'remote:%s:%s:%s' % (
101 self.req.env.get('wsgi.url_scheme') or 'http',
101 self.req.env.get('wsgi.url_scheme') or 'http',
102 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
102 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
103 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
103 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
104
104
105 def responsetype(self, v1compressible=False):
105 def responsetype(self, prefer_uncompressed):
106 """Determine the appropriate response type and compression settings.
106 """Determine the appropriate response type and compression settings.
107
107
108 The ``v1compressible`` argument states whether the response with
109 application/mercurial-0.1 media types should be zlib compressed.
110
111 Returns a tuple of (mediatype, compengine, engineopts).
108 Returns a tuple of (mediatype, compengine, engineopts).
112 """
109 """
113 # For now, if it isn't compressible in the old world, it's never
114 # compressible. We can change this to send uncompressed 0.2 payloads
115 # later.
116 if not v1compressible:
117 return HGTYPE, None, None
118
119 # Determine the response media type and compression engine based
110 # Determine the response media type and compression engine based
120 # on the request parameters.
111 # on the request parameters.
121 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
112 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
122
113
123 if '0.2' in protocaps:
114 if '0.2' in protocaps:
115 # All clients are expected to support uncompressed data.
116 if prefer_uncompressed:
117 return HGTYPE2, util._noopengine(), {}
118
124 # Default as defined by wire protocol spec.
119 # Default as defined by wire protocol spec.
125 compformats = ['zlib', 'none']
120 compformats = ['zlib', 'none']
126 for cap in protocaps:
121 for cap in protocaps:
127 if cap.startswith('comp='):
122 if cap.startswith('comp='):
128 compformats = cap[5:].split(',')
123 compformats = cap[5:].split(',')
129 break
124 break
130
125
131 # Now find an agreed upon compression format.
126 # Now find an agreed upon compression format.
132 for engine in wireproto.supportedcompengines(self.ui, self,
127 for engine in wireproto.supportedcompengines(self.ui, self,
133 util.SERVERROLE):
128 util.SERVERROLE):
134 if engine.wireprotosupport().name in compformats:
129 if engine.wireprotosupport().name in compformats:
135 opts = {}
130 opts = {}
136 level = self.ui.configint('server',
131 level = self.ui.configint('server',
137 '%slevel' % engine.name())
132 '%slevel' % engine.name())
138 if level is not None:
133 if level is not None:
139 opts['level'] = level
134 opts['level'] = level
140
135
141 return HGTYPE2, engine, opts
136 return HGTYPE2, engine, opts
142
137
143 # No mutually supported compression format. Fall back to the
138 # No mutually supported compression format. Fall back to the
144 # legacy protocol.
139 # legacy protocol.
145
140
146 # Don't allow untrusted settings because disabling compression or
141 # Don't allow untrusted settings because disabling compression or
147 # setting a very high compression level could lead to flooding
142 # setting a very high compression level could lead to flooding
148 # the server's network or CPU.
143 # the server's network or CPU.
149 opts = {'level': self.ui.configint('server', 'zliblevel')}
144 opts = {'level': self.ui.configint('server', 'zliblevel')}
150 return HGTYPE, util.compengines['zlib'], opts
145 return HGTYPE, util.compengines['zlib'], opts
151
146
152 def iscmd(cmd):
147 def iscmd(cmd):
153 return cmd in wireproto.commands
148 return cmd in wireproto.commands
154
149
155 def call(repo, req, cmd):
150 def call(repo, req, cmd):
156 p = webproto(req, repo.ui)
151 p = webproto(req, repo.ui)
157
152
158 def genversion2(gen, compress, engine, engineopts):
153 def genversion2(gen, engine, engineopts):
159 # application/mercurial-0.2 always sends a payload header
154 # application/mercurial-0.2 always sends a payload header
160 # identifying the compression engine.
155 # identifying the compression engine.
161 name = engine.wireprotosupport().name
156 name = engine.wireprotosupport().name
162 assert 0 < len(name) < 256
157 assert 0 < len(name) < 256
163 yield struct.pack('B', len(name))
158 yield struct.pack('B', len(name))
164 yield name
159 yield name
165
160
166 if compress:
161 for chunk in gen:
167 for chunk in engine.compressstream(gen, opts=engineopts):
162 yield chunk
168 yield chunk
169 else:
170 for chunk in gen:
171 yield chunk
172
163
173 rsp = wireproto.dispatch(repo, p, cmd)
164 rsp = wireproto.dispatch(repo, p, cmd)
174 if isinstance(rsp, bytes):
165 if isinstance(rsp, bytes):
175 req.respond(HTTP_OK, HGTYPE, body=rsp)
166 req.respond(HTTP_OK, HGTYPE, body=rsp)
176 return []
167 return []
168 elif isinstance(rsp, wireproto.streamres_legacy):
169 gen = rsp.gen
170 req.respond(HTTP_OK, HGTYPE)
171 return gen
177 elif isinstance(rsp, wireproto.streamres):
172 elif isinstance(rsp, wireproto.streamres):
178 gen = rsp.gen
173 gen = rsp.gen
179
174
180 # This code for compression should not be streamres specific. It
175 # This code for compression should not be streamres specific. It
181 # is here because we only compress streamres at the moment.
176 # is here because we only compress streamres at the moment.
182 mediatype, engine, engineopts = p.responsetype(rsp.v1compressible)
177 mediatype, engine, engineopts = p.responsetype(rsp.prefer_uncompressed)
178 gen = engine.compressstream(gen, engineopts)
183
179
184 if mediatype == HGTYPE and rsp.v1compressible:
180 if mediatype == HGTYPE2:
185 gen = engine.compressstream(gen, engineopts)
181 gen = genversion2(gen, engine, engineopts)
186 elif mediatype == HGTYPE2:
187 gen = genversion2(gen, rsp.v1compressible, engine, engineopts)
188
182
189 req.respond(HTTP_OK, mediatype)
183 req.respond(HTTP_OK, mediatype)
190 return gen
184 return gen
191 elif isinstance(rsp, wireproto.pushres):
185 elif isinstance(rsp, wireproto.pushres):
192 val = p.restore()
186 val = p.restore()
193 rsp = '%d\n%s' % (rsp.res, val)
187 rsp = '%d\n%s' % (rsp.res, val)
194 req.respond(HTTP_OK, HGTYPE, body=rsp)
188 req.respond(HTTP_OK, HGTYPE, body=rsp)
195 return []
189 return []
196 elif isinstance(rsp, wireproto.pusherr):
190 elif isinstance(rsp, wireproto.pusherr):
197 # drain the incoming bundle
191 # drain the incoming bundle
198 req.drain()
192 req.drain()
199 p.restore()
193 p.restore()
200 rsp = '0\n%s\n' % rsp.res
194 rsp = '0\n%s\n' % rsp.res
201 req.respond(HTTP_OK, HGTYPE, body=rsp)
195 req.respond(HTTP_OK, HGTYPE, body=rsp)
202 return []
196 return []
203 elif isinstance(rsp, wireproto.ooberror):
197 elif isinstance(rsp, wireproto.ooberror):
204 rsp = rsp.message
198 rsp = rsp.message
205 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
199 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
206 return []
200 return []
207 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
201 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
@@ -1,130 +1,131
1 # sshserver.py - ssh protocol server support for mercurial
1 # sshserver.py - ssh protocol server support for mercurial
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import sys
11 import sys
12
12
13 from .i18n import _
13 from .i18n import _
14 from . import (
14 from . import (
15 encoding,
15 encoding,
16 error,
16 error,
17 hook,
17 hook,
18 util,
18 util,
19 wireproto,
19 wireproto,
20 )
20 )
21
21
22 class sshserver(wireproto.abstractserverproto):
22 class sshserver(wireproto.abstractserverproto):
23 def __init__(self, ui, repo):
23 def __init__(self, ui, repo):
24 self.ui = ui
24 self.ui = ui
25 self.repo = repo
25 self.repo = repo
26 self.lock = None
26 self.lock = None
27 self.fin = ui.fin
27 self.fin = ui.fin
28 self.fout = ui.fout
28 self.fout = ui.fout
29 self.name = 'ssh'
29 self.name = 'ssh'
30
30
31 hook.redirect(True)
31 hook.redirect(True)
32 ui.fout = repo.ui.fout = ui.ferr
32 ui.fout = repo.ui.fout = ui.ferr
33
33
34 # Prevent insertion/deletion of CRs
34 # Prevent insertion/deletion of CRs
35 util.setbinary(self.fin)
35 util.setbinary(self.fin)
36 util.setbinary(self.fout)
36 util.setbinary(self.fout)
37
37
38 def getargs(self, args):
38 def getargs(self, args):
39 data = {}
39 data = {}
40 keys = args.split()
40 keys = args.split()
41 for n in xrange(len(keys)):
41 for n in xrange(len(keys)):
42 argline = self.fin.readline()[:-1]
42 argline = self.fin.readline()[:-1]
43 arg, l = argline.split()
43 arg, l = argline.split()
44 if arg not in keys:
44 if arg not in keys:
45 raise error.Abort(_("unexpected parameter %r") % arg)
45 raise error.Abort(_("unexpected parameter %r") % arg)
46 if arg == '*':
46 if arg == '*':
47 star = {}
47 star = {}
48 for k in xrange(int(l)):
48 for k in xrange(int(l)):
49 argline = self.fin.readline()[:-1]
49 argline = self.fin.readline()[:-1]
50 arg, l = argline.split()
50 arg, l = argline.split()
51 val = self.fin.read(int(l))
51 val = self.fin.read(int(l))
52 star[arg] = val
52 star[arg] = val
53 data['*'] = star
53 data['*'] = star
54 else:
54 else:
55 val = self.fin.read(int(l))
55 val = self.fin.read(int(l))
56 data[arg] = val
56 data[arg] = val
57 return [data[k] for k in keys]
57 return [data[k] for k in keys]
58
58
59 def getarg(self, name):
59 def getarg(self, name):
60 return self.getargs(name)[0]
60 return self.getargs(name)[0]
61
61
62 def getfile(self, fpout):
62 def getfile(self, fpout):
63 self.sendresponse('')
63 self.sendresponse('')
64 count = int(self.fin.readline())
64 count = int(self.fin.readline())
65 while count:
65 while count:
66 fpout.write(self.fin.read(count))
66 fpout.write(self.fin.read(count))
67 count = int(self.fin.readline())
67 count = int(self.fin.readline())
68
68
69 def redirect(self):
69 def redirect(self):
70 pass
70 pass
71
71
72 def sendresponse(self, v):
72 def sendresponse(self, v):
73 self.fout.write("%d\n" % len(v))
73 self.fout.write("%d\n" % len(v))
74 self.fout.write(v)
74 self.fout.write(v)
75 self.fout.flush()
75 self.fout.flush()
76
76
77 def sendstream(self, source):
77 def sendstream(self, source):
78 write = self.fout.write
78 write = self.fout.write
79 for chunk in source.gen:
79 for chunk in source.gen:
80 write(chunk)
80 write(chunk)
81 self.fout.flush()
81 self.fout.flush()
82
82
83 def sendpushresponse(self, rsp):
83 def sendpushresponse(self, rsp):
84 self.sendresponse('')
84 self.sendresponse('')
85 self.sendresponse(str(rsp.res))
85 self.sendresponse(str(rsp.res))
86
86
87 def sendpusherror(self, rsp):
87 def sendpusherror(self, rsp):
88 self.sendresponse(rsp.res)
88 self.sendresponse(rsp.res)
89
89
90 def sendooberror(self, rsp):
90 def sendooberror(self, rsp):
91 self.ui.ferr.write('%s\n-\n' % rsp.message)
91 self.ui.ferr.write('%s\n-\n' % rsp.message)
92 self.ui.ferr.flush()
92 self.ui.ferr.flush()
93 self.fout.write('\n')
93 self.fout.write('\n')
94 self.fout.flush()
94 self.fout.flush()
95
95
96 def serve_forever(self):
96 def serve_forever(self):
97 try:
97 try:
98 while self.serve_one():
98 while self.serve_one():
99 pass
99 pass
100 finally:
100 finally:
101 if self.lock is not None:
101 if self.lock is not None:
102 self.lock.release()
102 self.lock.release()
103 sys.exit(0)
103 sys.exit(0)
104
104
105 handlers = {
105 handlers = {
106 str: sendresponse,
106 str: sendresponse,
107 wireproto.streamres: sendstream,
107 wireproto.streamres: sendstream,
108 wireproto.streamres_legacy: sendstream,
108 wireproto.pushres: sendpushresponse,
109 wireproto.pushres: sendpushresponse,
109 wireproto.pusherr: sendpusherror,
110 wireproto.pusherr: sendpusherror,
110 wireproto.ooberror: sendooberror,
111 wireproto.ooberror: sendooberror,
111 }
112 }
112
113
113 def serve_one(self):
114 def serve_one(self):
114 cmd = self.fin.readline()[:-1]
115 cmd = self.fin.readline()[:-1]
115 if cmd and cmd in wireproto.commands:
116 if cmd and cmd in wireproto.commands:
116 rsp = wireproto.dispatch(self.repo, self, cmd)
117 rsp = wireproto.dispatch(self.repo, self, cmd)
117 self.handlers[rsp.__class__](self, rsp)
118 self.handlers[rsp.__class__](self, rsp)
118 elif cmd:
119 elif cmd:
119 impl = getattr(self, 'do_' + cmd, None)
120 impl = getattr(self, 'do_' + cmd, None)
120 if impl:
121 if impl:
121 r = impl()
122 r = impl()
122 if r is not None:
123 if r is not None:
123 self.sendresponse(r)
124 self.sendresponse(r)
124 else:
125 else:
125 self.sendresponse("")
126 self.sendresponse("")
126 return cmd != ''
127 return cmd != ''
127
128
128 def _client(self):
129 def _client(self):
129 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
130 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
130 return 'remote:ssh:' + client
131 return 'remote:ssh:' + client
@@ -1,1057 +1,1068
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 )
34 )
35
35
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 'IncompatibleClient')
41 'IncompatibleClient')
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43
43
44 class abstractserverproto(object):
44 class abstractserverproto(object):
45 """abstract class that summarizes the protocol API
45 """abstract class that summarizes the protocol API
46
46
47 Used as reference and documentation.
47 Used as reference and documentation.
48 """
48 """
49
49
50 def getargs(self, args):
50 def getargs(self, args):
51 """return the value for arguments in <args>
51 """return the value for arguments in <args>
52
52
53 returns a list of values (same order as <args>)"""
53 returns a list of values (same order as <args>)"""
54 raise NotImplementedError()
54 raise NotImplementedError()
55
55
56 def getfile(self, fp):
56 def getfile(self, fp):
57 """write the whole content of a file into a file like object
57 """write the whole content of a file into a file like object
58
58
59 The file is in the form::
59 The file is in the form::
60
60
61 (<chunk-size>\n<chunk>)+0\n
61 (<chunk-size>\n<chunk>)+0\n
62
62
63 chunk size is the ascii version of the int.
63 chunk size is the ascii version of the int.
64 """
64 """
65 raise NotImplementedError()
65 raise NotImplementedError()
66
66
67 def redirect(self):
67 def redirect(self):
68 """may setup interception for stdout and stderr
68 """may setup interception for stdout and stderr
69
69
70 See also the `restore` method."""
70 See also the `restore` method."""
71 raise NotImplementedError()
71 raise NotImplementedError()
72
72
73 # If the `redirect` function does install interception, the `restore`
73 # If the `redirect` function does install interception, the `restore`
74 # function MUST be defined. If interception is not used, this function
74 # function MUST be defined. If interception is not used, this function
75 # MUST NOT be defined.
75 # MUST NOT be defined.
76 #
76 #
77 # left commented here on purpose
77 # left commented here on purpose
78 #
78 #
79 #def restore(self):
79 #def restore(self):
80 # """reinstall previous stdout and stderr and return intercepted stdout
80 # """reinstall previous stdout and stderr and return intercepted stdout
81 # """
81 # """
82 # raise NotImplementedError()
82 # raise NotImplementedError()
83
83
84 class remoteiterbatcher(peer.iterbatcher):
84 class remoteiterbatcher(peer.iterbatcher):
85 def __init__(self, remote):
85 def __init__(self, remote):
86 super(remoteiterbatcher, self).__init__()
86 super(remoteiterbatcher, self).__init__()
87 self._remote = remote
87 self._remote = remote
88
88
89 def __getattr__(self, name):
89 def __getattr__(self, name):
90 # Validate this method is batchable, since submit() only supports
90 # Validate this method is batchable, since submit() only supports
91 # batchable methods.
91 # batchable methods.
92 fn = getattr(self._remote, name)
92 fn = getattr(self._remote, name)
93 if not getattr(fn, 'batchable', None):
93 if not getattr(fn, 'batchable', None):
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 'call to %r' % name)
95 'call to %r' % name)
96
96
97 return super(remoteiterbatcher, self).__getattr__(name)
97 return super(remoteiterbatcher, self).__getattr__(name)
98
98
99 def submit(self):
99 def submit(self):
100 """Break the batch request into many patch calls and pipeline them.
100 """Break the batch request into many patch calls and pipeline them.
101
101
102 This is mostly valuable over http where request sizes can be
102 This is mostly valuable over http where request sizes can be
103 limited, but can be used in other places as well.
103 limited, but can be used in other places as well.
104 """
104 """
105 # 2-tuple of (command, arguments) that represents what will be
105 # 2-tuple of (command, arguments) that represents what will be
106 # sent over the wire.
106 # sent over the wire.
107 requests = []
107 requests = []
108
108
109 # 4-tuple of (command, final future, @batchable generator, remote
109 # 4-tuple of (command, final future, @batchable generator, remote
110 # future).
110 # future).
111 results = []
111 results = []
112
112
113 for command, args, opts, finalfuture in self.calls:
113 for command, args, opts, finalfuture in self.calls:
114 mtd = getattr(self._remote, command)
114 mtd = getattr(self._remote, command)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116
116
117 commandargs, fremote = next(batchable)
117 commandargs, fremote = next(batchable)
118 assert fremote
118 assert fremote
119 requests.append((command, commandargs))
119 requests.append((command, commandargs))
120 results.append((command, finalfuture, batchable, fremote))
120 results.append((command, finalfuture, batchable, fremote))
121
121
122 if requests:
122 if requests:
123 self._resultiter = self._remote._submitbatch(requests)
123 self._resultiter = self._remote._submitbatch(requests)
124
124
125 self._results = results
125 self._results = results
126
126
127 def results(self):
127 def results(self):
128 for command, finalfuture, batchable, remotefuture in self._results:
128 for command, finalfuture, batchable, remotefuture in self._results:
129 # Get the raw result, set it in the remote future, feed it
129 # Get the raw result, set it in the remote future, feed it
130 # back into the @batchable generator so it can be decoded, and
130 # back into the @batchable generator so it can be decoded, and
131 # set the result on the final future to this value.
131 # set the result on the final future to this value.
132 remoteresult = next(self._resultiter)
132 remoteresult = next(self._resultiter)
133 remotefuture.set(remoteresult)
133 remotefuture.set(remoteresult)
134 finalfuture.set(next(batchable))
134 finalfuture.set(next(batchable))
135
135
136 # Verify our @batchable generators only emit 2 values.
136 # Verify our @batchable generators only emit 2 values.
137 try:
137 try:
138 next(batchable)
138 next(batchable)
139 except StopIteration:
139 except StopIteration:
140 pass
140 pass
141 else:
141 else:
142 raise error.ProgrammingError('%s @batchable generator emitted '
142 raise error.ProgrammingError('%s @batchable generator emitted '
143 'unexpected value count' % command)
143 'unexpected value count' % command)
144
144
145 yield finalfuture.value
145 yield finalfuture.value
146
146
147 # Forward a couple of names from peer to make wireproto interactions
147 # Forward a couple of names from peer to make wireproto interactions
148 # slightly more sensible.
148 # slightly more sensible.
149 batchable = peer.batchable
149 batchable = peer.batchable
150 future = peer.future
150 future = peer.future
151
151
152 # list of nodes encoding / decoding
152 # list of nodes encoding / decoding
153
153
154 def decodelist(l, sep=' '):
154 def decodelist(l, sep=' '):
155 if l:
155 if l:
156 return [bin(v) for v in l.split(sep)]
156 return [bin(v) for v in l.split(sep)]
157 return []
157 return []
158
158
159 def encodelist(l, sep=' '):
159 def encodelist(l, sep=' '):
160 try:
160 try:
161 return sep.join(map(hex, l))
161 return sep.join(map(hex, l))
162 except TypeError:
162 except TypeError:
163 raise
163 raise
164
164
165 # batched call argument encoding
165 # batched call argument encoding
166
166
167 def escapearg(plain):
167 def escapearg(plain):
168 return (plain
168 return (plain
169 .replace(':', ':c')
169 .replace(':', ':c')
170 .replace(',', ':o')
170 .replace(',', ':o')
171 .replace(';', ':s')
171 .replace(';', ':s')
172 .replace('=', ':e'))
172 .replace('=', ':e'))
173
173
174 def unescapearg(escaped):
174 def unescapearg(escaped):
175 return (escaped
175 return (escaped
176 .replace(':e', '=')
176 .replace(':e', '=')
177 .replace(':s', ';')
177 .replace(':s', ';')
178 .replace(':o', ',')
178 .replace(':o', ',')
179 .replace(':c', ':'))
179 .replace(':c', ':'))
180
180
181 def encodebatchcmds(req):
181 def encodebatchcmds(req):
182 """Return a ``cmds`` argument value for the ``batch`` command."""
182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 cmds = []
183 cmds = []
184 for op, argsdict in req:
184 for op, argsdict in req:
185 # Old servers didn't properly unescape argument names. So prevent
185 # Old servers didn't properly unescape argument names. So prevent
186 # the sending of argument names that may not be decoded properly by
186 # the sending of argument names that may not be decoded properly by
187 # servers.
187 # servers.
188 assert all(escapearg(k) == k for k in argsdict)
188 assert all(escapearg(k) == k for k in argsdict)
189
189
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 for k, v in argsdict.iteritems())
191 for k, v in argsdict.iteritems())
192 cmds.append('%s %s' % (op, args))
192 cmds.append('%s %s' % (op, args))
193
193
194 return ';'.join(cmds)
194 return ';'.join(cmds)
195
195
196 # mapping of options accepted by getbundle and their types
196 # mapping of options accepted by getbundle and their types
197 #
197 #
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 # such options are properly processed in exchange.getbundle.
199 # such options are properly processed in exchange.getbundle.
200 #
200 #
201 # supported types are:
201 # supported types are:
202 #
202 #
203 # :nodes: list of binary nodes
203 # :nodes: list of binary nodes
204 # :csv: list of comma-separated values
204 # :csv: list of comma-separated values
205 # :scsv: list of comma-separated values return as set
205 # :scsv: list of comma-separated values return as set
206 # :plain: string with no transformation needed.
206 # :plain: string with no transformation needed.
207 gboptsmap = {'heads': 'nodes',
207 gboptsmap = {'heads': 'nodes',
208 'bookmarks': 'boolean',
208 'bookmarks': 'boolean',
209 'common': 'nodes',
209 'common': 'nodes',
210 'obsmarkers': 'boolean',
210 'obsmarkers': 'boolean',
211 'phases': 'boolean',
211 'phases': 'boolean',
212 'bundlecaps': 'scsv',
212 'bundlecaps': 'scsv',
213 'listkeys': 'csv',
213 'listkeys': 'csv',
214 'cg': 'boolean',
214 'cg': 'boolean',
215 'cbattempted': 'boolean'}
215 'cbattempted': 'boolean'}
216
216
217 # client side
217 # client side
218
218
219 class wirepeer(repository.legacypeer):
219 class wirepeer(repository.legacypeer):
220 """Client-side interface for communicating with a peer repository.
220 """Client-side interface for communicating with a peer repository.
221
221
222 Methods commonly call wire protocol commands of the same name.
222 Methods commonly call wire protocol commands of the same name.
223
223
224 See also httppeer.py and sshpeer.py for protocol-specific
224 See also httppeer.py and sshpeer.py for protocol-specific
225 implementations of this interface.
225 implementations of this interface.
226 """
226 """
227 # Begin of basewirepeer interface.
227 # Begin of basewirepeer interface.
228
228
229 def iterbatch(self):
229 def iterbatch(self):
230 return remoteiterbatcher(self)
230 return remoteiterbatcher(self)
231
231
232 @batchable
232 @batchable
233 def lookup(self, key):
233 def lookup(self, key):
234 self.requirecap('lookup', _('look up remote revision'))
234 self.requirecap('lookup', _('look up remote revision'))
235 f = future()
235 f = future()
236 yield {'key': encoding.fromlocal(key)}, f
236 yield {'key': encoding.fromlocal(key)}, f
237 d = f.value
237 d = f.value
238 success, data = d[:-1].split(" ", 1)
238 success, data = d[:-1].split(" ", 1)
239 if int(success):
239 if int(success):
240 yield bin(data)
240 yield bin(data)
241 else:
241 else:
242 self._abort(error.RepoError(data))
242 self._abort(error.RepoError(data))
243
243
244 @batchable
244 @batchable
245 def heads(self):
245 def heads(self):
246 f = future()
246 f = future()
247 yield {}, f
247 yield {}, f
248 d = f.value
248 d = f.value
249 try:
249 try:
250 yield decodelist(d[:-1])
250 yield decodelist(d[:-1])
251 except ValueError:
251 except ValueError:
252 self._abort(error.ResponseError(_("unexpected response:"), d))
252 self._abort(error.ResponseError(_("unexpected response:"), d))
253
253
254 @batchable
254 @batchable
255 def known(self, nodes):
255 def known(self, nodes):
256 f = future()
256 f = future()
257 yield {'nodes': encodelist(nodes)}, f
257 yield {'nodes': encodelist(nodes)}, f
258 d = f.value
258 d = f.value
259 try:
259 try:
260 yield [bool(int(b)) for b in d]
260 yield [bool(int(b)) for b in d]
261 except ValueError:
261 except ValueError:
262 self._abort(error.ResponseError(_("unexpected response:"), d))
262 self._abort(error.ResponseError(_("unexpected response:"), d))
263
263
264 @batchable
264 @batchable
265 def branchmap(self):
265 def branchmap(self):
266 f = future()
266 f = future()
267 yield {}, f
267 yield {}, f
268 d = f.value
268 d = f.value
269 try:
269 try:
270 branchmap = {}
270 branchmap = {}
271 for branchpart in d.splitlines():
271 for branchpart in d.splitlines():
272 branchname, branchheads = branchpart.split(' ', 1)
272 branchname, branchheads = branchpart.split(' ', 1)
273 branchname = encoding.tolocal(urlreq.unquote(branchname))
273 branchname = encoding.tolocal(urlreq.unquote(branchname))
274 branchheads = decodelist(branchheads)
274 branchheads = decodelist(branchheads)
275 branchmap[branchname] = branchheads
275 branchmap[branchname] = branchheads
276 yield branchmap
276 yield branchmap
277 except TypeError:
277 except TypeError:
278 self._abort(error.ResponseError(_("unexpected response:"), d))
278 self._abort(error.ResponseError(_("unexpected response:"), d))
279
279
280 @batchable
280 @batchable
281 def listkeys(self, namespace):
281 def listkeys(self, namespace):
282 if not self.capable('pushkey'):
282 if not self.capable('pushkey'):
283 yield {}, None
283 yield {}, None
284 f = future()
284 f = future()
285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
286 yield {'namespace': encoding.fromlocal(namespace)}, f
286 yield {'namespace': encoding.fromlocal(namespace)}, f
287 d = f.value
287 d = f.value
288 self.ui.debug('received listkey for "%s": %i bytes\n'
288 self.ui.debug('received listkey for "%s": %i bytes\n'
289 % (namespace, len(d)))
289 % (namespace, len(d)))
290 yield pushkeymod.decodekeys(d)
290 yield pushkeymod.decodekeys(d)
291
291
292 @batchable
292 @batchable
293 def pushkey(self, namespace, key, old, new):
293 def pushkey(self, namespace, key, old, new):
294 if not self.capable('pushkey'):
294 if not self.capable('pushkey'):
295 yield False, None
295 yield False, None
296 f = future()
296 f = future()
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 yield {'namespace': encoding.fromlocal(namespace),
298 yield {'namespace': encoding.fromlocal(namespace),
299 'key': encoding.fromlocal(key),
299 'key': encoding.fromlocal(key),
300 'old': encoding.fromlocal(old),
300 'old': encoding.fromlocal(old),
301 'new': encoding.fromlocal(new)}, f
301 'new': encoding.fromlocal(new)}, f
302 d = f.value
302 d = f.value
303 d, output = d.split('\n', 1)
303 d, output = d.split('\n', 1)
304 try:
304 try:
305 d = bool(int(d))
305 d = bool(int(d))
306 except ValueError:
306 except ValueError:
307 raise error.ResponseError(
307 raise error.ResponseError(
308 _('push failed (unexpected response):'), d)
308 _('push failed (unexpected response):'), d)
309 for l in output.splitlines(True):
309 for l in output.splitlines(True):
310 self.ui.status(_('remote: '), l)
310 self.ui.status(_('remote: '), l)
311 yield d
311 yield d
312
312
313 def stream_out(self):
313 def stream_out(self):
314 return self._callstream('stream_out')
314 return self._callstream('stream_out')
315
315
316 def getbundle(self, source, **kwargs):
316 def getbundle(self, source, **kwargs):
317 kwargs = pycompat.byteskwargs(kwargs)
317 kwargs = pycompat.byteskwargs(kwargs)
318 self.requirecap('getbundle', _('look up remote changes'))
318 self.requirecap('getbundle', _('look up remote changes'))
319 opts = {}
319 opts = {}
320 bundlecaps = kwargs.get('bundlecaps')
320 bundlecaps = kwargs.get('bundlecaps')
321 if bundlecaps is not None:
321 if bundlecaps is not None:
322 kwargs['bundlecaps'] = sorted(bundlecaps)
322 kwargs['bundlecaps'] = sorted(bundlecaps)
323 else:
323 else:
324 bundlecaps = () # kwargs could have it to None
324 bundlecaps = () # kwargs could have it to None
325 for key, value in kwargs.iteritems():
325 for key, value in kwargs.iteritems():
326 if value is None:
326 if value is None:
327 continue
327 continue
328 keytype = gboptsmap.get(key)
328 keytype = gboptsmap.get(key)
329 if keytype is None:
329 if keytype is None:
330 raise error.ProgrammingError(
330 raise error.ProgrammingError(
331 'Unexpectedly None keytype for key %s' % key)
331 'Unexpectedly None keytype for key %s' % key)
332 elif keytype == 'nodes':
332 elif keytype == 'nodes':
333 value = encodelist(value)
333 value = encodelist(value)
334 elif keytype in ('csv', 'scsv'):
334 elif keytype in ('csv', 'scsv'):
335 value = ','.join(value)
335 value = ','.join(value)
336 elif keytype == 'boolean':
336 elif keytype == 'boolean':
337 value = '%i' % bool(value)
337 value = '%i' % bool(value)
338 elif keytype != 'plain':
338 elif keytype != 'plain':
339 raise KeyError('unknown getbundle option type %s'
339 raise KeyError('unknown getbundle option type %s'
340 % keytype)
340 % keytype)
341 opts[key] = value
341 opts[key] = value
342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
343 if any((cap.startswith('HG2') for cap in bundlecaps)):
343 if any((cap.startswith('HG2') for cap in bundlecaps)):
344 return bundle2.getunbundler(self.ui, f)
344 return bundle2.getunbundler(self.ui, f)
345 else:
345 else:
346 return changegroupmod.cg1unpacker(f, 'UN')
346 return changegroupmod.cg1unpacker(f, 'UN')
347
347
348 def unbundle(self, cg, heads, url):
348 def unbundle(self, cg, heads, url):
349 '''Send cg (a readable file-like object representing the
349 '''Send cg (a readable file-like object representing the
350 changegroup to push, typically a chunkbuffer object) to the
350 changegroup to push, typically a chunkbuffer object) to the
351 remote server as a bundle.
351 remote server as a bundle.
352
352
353 When pushing a bundle10 stream, return an integer indicating the
353 When pushing a bundle10 stream, return an integer indicating the
354 result of the push (see changegroup.apply()).
354 result of the push (see changegroup.apply()).
355
355
356 When pushing a bundle20 stream, return a bundle20 stream.
356 When pushing a bundle20 stream, return a bundle20 stream.
357
357
358 `url` is the url the client thinks it's pushing to, which is
358 `url` is the url the client thinks it's pushing to, which is
359 visible to hooks.
359 visible to hooks.
360 '''
360 '''
361
361
362 if heads != ['force'] and self.capable('unbundlehash'):
362 if heads != ['force'] and self.capable('unbundlehash'):
363 heads = encodelist(['hashed',
363 heads = encodelist(['hashed',
364 hashlib.sha1(''.join(sorted(heads))).digest()])
364 hashlib.sha1(''.join(sorted(heads))).digest()])
365 else:
365 else:
366 heads = encodelist(heads)
366 heads = encodelist(heads)
367
367
368 if util.safehasattr(cg, 'deltaheader'):
368 if util.safehasattr(cg, 'deltaheader'):
369 # this a bundle10, do the old style call sequence
369 # this a bundle10, do the old style call sequence
370 ret, output = self._callpush("unbundle", cg, heads=heads)
370 ret, output = self._callpush("unbundle", cg, heads=heads)
371 if ret == "":
371 if ret == "":
372 raise error.ResponseError(
372 raise error.ResponseError(
373 _('push failed:'), output)
373 _('push failed:'), output)
374 try:
374 try:
375 ret = int(ret)
375 ret = int(ret)
376 except ValueError:
376 except ValueError:
377 raise error.ResponseError(
377 raise error.ResponseError(
378 _('push failed (unexpected response):'), ret)
378 _('push failed (unexpected response):'), ret)
379
379
380 for l in output.splitlines(True):
380 for l in output.splitlines(True):
381 self.ui.status(_('remote: '), l)
381 self.ui.status(_('remote: '), l)
382 else:
382 else:
383 # bundle2 push. Send a stream, fetch a stream.
383 # bundle2 push. Send a stream, fetch a stream.
384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
385 ret = bundle2.getunbundler(self.ui, stream)
385 ret = bundle2.getunbundler(self.ui, stream)
386 return ret
386 return ret
387
387
388 # End of basewirepeer interface.
388 # End of basewirepeer interface.
389
389
390 # Begin of baselegacywirepeer interface.
390 # Begin of baselegacywirepeer interface.
391
391
392 def branches(self, nodes):
392 def branches(self, nodes):
393 n = encodelist(nodes)
393 n = encodelist(nodes)
394 d = self._call("branches", nodes=n)
394 d = self._call("branches", nodes=n)
395 try:
395 try:
396 br = [tuple(decodelist(b)) for b in d.splitlines()]
396 br = [tuple(decodelist(b)) for b in d.splitlines()]
397 return br
397 return br
398 except ValueError:
398 except ValueError:
399 self._abort(error.ResponseError(_("unexpected response:"), d))
399 self._abort(error.ResponseError(_("unexpected response:"), d))
400
400
401 def between(self, pairs):
401 def between(self, pairs):
402 batch = 8 # avoid giant requests
402 batch = 8 # avoid giant requests
403 r = []
403 r = []
404 for i in xrange(0, len(pairs), batch):
404 for i in xrange(0, len(pairs), batch):
405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
406 d = self._call("between", pairs=n)
406 d = self._call("between", pairs=n)
407 try:
407 try:
408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
409 except ValueError:
409 except ValueError:
410 self._abort(error.ResponseError(_("unexpected response:"), d))
410 self._abort(error.ResponseError(_("unexpected response:"), d))
411 return r
411 return r
412
412
413 def changegroup(self, nodes, kind):
413 def changegroup(self, nodes, kind):
414 n = encodelist(nodes)
414 n = encodelist(nodes)
415 f = self._callcompressable("changegroup", roots=n)
415 f = self._callcompressable("changegroup", roots=n)
416 return changegroupmod.cg1unpacker(f, 'UN')
416 return changegroupmod.cg1unpacker(f, 'UN')
417
417
418 def changegroupsubset(self, bases, heads, kind):
418 def changegroupsubset(self, bases, heads, kind):
419 self.requirecap('changegroupsubset', _('look up remote changes'))
419 self.requirecap('changegroupsubset', _('look up remote changes'))
420 bases = encodelist(bases)
420 bases = encodelist(bases)
421 heads = encodelist(heads)
421 heads = encodelist(heads)
422 f = self._callcompressable("changegroupsubset",
422 f = self._callcompressable("changegroupsubset",
423 bases=bases, heads=heads)
423 bases=bases, heads=heads)
424 return changegroupmod.cg1unpacker(f, 'UN')
424 return changegroupmod.cg1unpacker(f, 'UN')
425
425
426 # End of baselegacywirepeer interface.
426 # End of baselegacywirepeer interface.
427
427
428 def _submitbatch(self, req):
428 def _submitbatch(self, req):
429 """run batch request <req> on the server
429 """run batch request <req> on the server
430
430
431 Returns an iterator of the raw responses from the server.
431 Returns an iterator of the raw responses from the server.
432 """
432 """
433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
434 chunk = rsp.read(1024)
434 chunk = rsp.read(1024)
435 work = [chunk]
435 work = [chunk]
436 while chunk:
436 while chunk:
437 while ';' not in chunk and chunk:
437 while ';' not in chunk and chunk:
438 chunk = rsp.read(1024)
438 chunk = rsp.read(1024)
439 work.append(chunk)
439 work.append(chunk)
440 merged = ''.join(work)
440 merged = ''.join(work)
441 while ';' in merged:
441 while ';' in merged:
442 one, merged = merged.split(';', 1)
442 one, merged = merged.split(';', 1)
443 yield unescapearg(one)
443 yield unescapearg(one)
444 chunk = rsp.read(1024)
444 chunk = rsp.read(1024)
445 work = [merged, chunk]
445 work = [merged, chunk]
446 yield unescapearg(''.join(work))
446 yield unescapearg(''.join(work))
447
447
448 def _submitone(self, op, args):
448 def _submitone(self, op, args):
449 return self._call(op, **pycompat.strkwargs(args))
449 return self._call(op, **pycompat.strkwargs(args))
450
450
451 def debugwireargs(self, one, two, three=None, four=None, five=None):
451 def debugwireargs(self, one, two, three=None, four=None, five=None):
452 # don't pass optional arguments left at their default value
452 # don't pass optional arguments left at their default value
453 opts = {}
453 opts = {}
454 if three is not None:
454 if three is not None:
455 opts[r'three'] = three
455 opts[r'three'] = three
456 if four is not None:
456 if four is not None:
457 opts[r'four'] = four
457 opts[r'four'] = four
458 return self._call('debugwireargs', one=one, two=two, **opts)
458 return self._call('debugwireargs', one=one, two=two, **opts)
459
459
460 def _call(self, cmd, **args):
460 def _call(self, cmd, **args):
461 """execute <cmd> on the server
461 """execute <cmd> on the server
462
462
463 The command is expected to return a simple string.
463 The command is expected to return a simple string.
464
464
465 returns the server reply as a string."""
465 returns the server reply as a string."""
466 raise NotImplementedError()
466 raise NotImplementedError()
467
467
468 def _callstream(self, cmd, **args):
468 def _callstream(self, cmd, **args):
469 """execute <cmd> on the server
469 """execute <cmd> on the server
470
470
471 The command is expected to return a stream. Note that if the
471 The command is expected to return a stream. Note that if the
472 command doesn't return a stream, _callstream behaves
472 command doesn't return a stream, _callstream behaves
473 differently for ssh and http peers.
473 differently for ssh and http peers.
474
474
475 returns the server reply as a file like object.
475 returns the server reply as a file like object.
476 """
476 """
477 raise NotImplementedError()
477 raise NotImplementedError()
478
478
479 def _callcompressable(self, cmd, **args):
479 def _callcompressable(self, cmd, **args):
480 """execute <cmd> on the server
480 """execute <cmd> on the server
481
481
482 The command is expected to return a stream.
482 The command is expected to return a stream.
483
483
484 The stream may have been compressed in some implementations. This
484 The stream may have been compressed in some implementations. This
485 function takes care of the decompression. This is the only difference
485 function takes care of the decompression. This is the only difference
486 with _callstream.
486 with _callstream.
487
487
488 returns the server reply as a file like object.
488 returns the server reply as a file like object.
489 """
489 """
490 raise NotImplementedError()
490 raise NotImplementedError()
491
491
492 def _callpush(self, cmd, fp, **args):
492 def _callpush(self, cmd, fp, **args):
493 """execute a <cmd> on server
493 """execute a <cmd> on server
494
494
495 The command is expected to be related to a push. Push has a special
495 The command is expected to be related to a push. Push has a special
496 return method.
496 return method.
497
497
498 returns the server reply as a (ret, output) tuple. ret is either
498 returns the server reply as a (ret, output) tuple. ret is either
499 empty (error) or a stringified int.
499 empty (error) or a stringified int.
500 """
500 """
501 raise NotImplementedError()
501 raise NotImplementedError()
502
502
503 def _calltwowaystream(self, cmd, fp, **args):
503 def _calltwowaystream(self, cmd, fp, **args):
504 """execute <cmd> on server
504 """execute <cmd> on server
505
505
506 The command will send a stream to the server and get a stream in reply.
506 The command will send a stream to the server and get a stream in reply.
507 """
507 """
508 raise NotImplementedError()
508 raise NotImplementedError()
509
509
510 def _abort(self, exception):
510 def _abort(self, exception):
511 """clearly abort the wire protocol connection and raise the exception
511 """clearly abort the wire protocol connection and raise the exception
512 """
512 """
513 raise NotImplementedError()
513 raise NotImplementedError()
514
514
515 # server side
515 # server side
516
516
517 # wire protocol command can either return a string or one of these classes.
517 # wire protocol command can either return a string or one of these classes.
518 class streamres(object):
518 class streamres(object):
519 """wireproto reply: binary stream
519 """wireproto reply: binary stream
520
520
521 The call was successful and the result is a stream.
521 The call was successful and the result is a stream.
522
522
523 Accepts a generator containing chunks of data to be sent to the client.
523 Accepts a generator containing chunks of data to be sent to the client.
524
524
525 ``v1compressible`` indicates whether this data can be compressed to
525 ``prefer_uncompressed`` indicates that the data is expected to be
526 "version 1" clients (technically: HTTP peers using
526 uncompressable and that the stream should therefore use the ``none``
527 application/mercurial-0.1 media type). This flag should NOT be used on
527 engine.
528 new commands because new clients should support a more modern compression
529 mechanism.
530 """
528 """
531 def __init__(self, gen=None, v1compressible=False):
529 def __init__(self, gen=None, prefer_uncompressed=False):
532 self.gen = gen
530 self.gen = gen
533 self.v1compressible = v1compressible
531 self.prefer_uncompressed = prefer_uncompressed
532
533 class streamres_legacy(object):
534 """wireproto reply: uncompressed binary stream
535
536 The call was successful and the result is a stream.
537
538 Accepts a generator containing chunks of data to be sent to the client.
539
540 Like ``streamres``, but sends an uncompressed data for "version 1" clients
541 using the application/mercurial-0.1 media type.
542 """
543 def __init__(self, gen=None):
544 self.gen = gen
534
545
535 class pushres(object):
546 class pushres(object):
536 """wireproto reply: success with simple integer return
547 """wireproto reply: success with simple integer return
537
548
538 The call was successful and returned an integer contained in `self.res`.
549 The call was successful and returned an integer contained in `self.res`.
539 """
550 """
540 def __init__(self, res):
551 def __init__(self, res):
541 self.res = res
552 self.res = res
542
553
543 class pusherr(object):
554 class pusherr(object):
544 """wireproto reply: failure
555 """wireproto reply: failure
545
556
546 The call failed. The `self.res` attribute contains the error message.
557 The call failed. The `self.res` attribute contains the error message.
547 """
558 """
548 def __init__(self, res):
559 def __init__(self, res):
549 self.res = res
560 self.res = res
550
561
551 class ooberror(object):
562 class ooberror(object):
552 """wireproto reply: failure of a batch of operation
563 """wireproto reply: failure of a batch of operation
553
564
554 Something failed during a batch call. The error message is stored in
565 Something failed during a batch call. The error message is stored in
555 `self.message`.
566 `self.message`.
556 """
567 """
557 def __init__(self, message):
568 def __init__(self, message):
558 self.message = message
569 self.message = message
559
570
560 def getdispatchrepo(repo, proto, command):
571 def getdispatchrepo(repo, proto, command):
561 """Obtain the repo used for processing wire protocol commands.
572 """Obtain the repo used for processing wire protocol commands.
562
573
563 The intent of this function is to serve as a monkeypatch point for
574 The intent of this function is to serve as a monkeypatch point for
564 extensions that need commands to operate on different repo views under
575 extensions that need commands to operate on different repo views under
565 specialized circumstances.
576 specialized circumstances.
566 """
577 """
567 return repo.filtered('served')
578 return repo.filtered('served')
568
579
569 def dispatch(repo, proto, command):
580 def dispatch(repo, proto, command):
570 repo = getdispatchrepo(repo, proto, command)
581 repo = getdispatchrepo(repo, proto, command)
571 func, spec = commands[command]
582 func, spec = commands[command]
572 args = proto.getargs(spec)
583 args = proto.getargs(spec)
573 return func(repo, proto, *args)
584 return func(repo, proto, *args)
574
585
575 def options(cmd, keys, others):
586 def options(cmd, keys, others):
576 opts = {}
587 opts = {}
577 for k in keys:
588 for k in keys:
578 if k in others:
589 if k in others:
579 opts[k] = others[k]
590 opts[k] = others[k]
580 del others[k]
591 del others[k]
581 if others:
592 if others:
582 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
593 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
583 % (cmd, ",".join(others)))
594 % (cmd, ",".join(others)))
584 return opts
595 return opts
585
596
586 def bundle1allowed(repo, action):
597 def bundle1allowed(repo, action):
587 """Whether a bundle1 operation is allowed from the server.
598 """Whether a bundle1 operation is allowed from the server.
588
599
589 Priority is:
600 Priority is:
590
601
591 1. server.bundle1gd.<action> (if generaldelta active)
602 1. server.bundle1gd.<action> (if generaldelta active)
592 2. server.bundle1.<action>
603 2. server.bundle1.<action>
593 3. server.bundle1gd (if generaldelta active)
604 3. server.bundle1gd (if generaldelta active)
594 4. server.bundle1
605 4. server.bundle1
595 """
606 """
596 ui = repo.ui
607 ui = repo.ui
597 gd = 'generaldelta' in repo.requirements
608 gd = 'generaldelta' in repo.requirements
598
609
599 if gd:
610 if gd:
600 v = ui.configbool('server', 'bundle1gd.%s' % action)
611 v = ui.configbool('server', 'bundle1gd.%s' % action)
601 if v is not None:
612 if v is not None:
602 return v
613 return v
603
614
604 v = ui.configbool('server', 'bundle1.%s' % action)
615 v = ui.configbool('server', 'bundle1.%s' % action)
605 if v is not None:
616 if v is not None:
606 return v
617 return v
607
618
608 if gd:
619 if gd:
609 v = ui.configbool('server', 'bundle1gd')
620 v = ui.configbool('server', 'bundle1gd')
610 if v is not None:
621 if v is not None:
611 return v
622 return v
612
623
613 return ui.configbool('server', 'bundle1')
624 return ui.configbool('server', 'bundle1')
614
625
615 def supportedcompengines(ui, proto, role):
626 def supportedcompengines(ui, proto, role):
616 """Obtain the list of supported compression engines for a request."""
627 """Obtain the list of supported compression engines for a request."""
617 assert role in (util.CLIENTROLE, util.SERVERROLE)
628 assert role in (util.CLIENTROLE, util.SERVERROLE)
618
629
619 compengines = util.compengines.supportedwireengines(role)
630 compengines = util.compengines.supportedwireengines(role)
620
631
621 # Allow config to override default list and ordering.
632 # Allow config to override default list and ordering.
622 if role == util.SERVERROLE:
633 if role == util.SERVERROLE:
623 configengines = ui.configlist('server', 'compressionengines')
634 configengines = ui.configlist('server', 'compressionengines')
624 config = 'server.compressionengines'
635 config = 'server.compressionengines'
625 else:
636 else:
626 # This is currently implemented mainly to facilitate testing. In most
637 # This is currently implemented mainly to facilitate testing. In most
627 # cases, the server should be in charge of choosing a compression engine
638 # cases, the server should be in charge of choosing a compression engine
628 # because a server has the most to lose from a sub-optimal choice. (e.g.
639 # because a server has the most to lose from a sub-optimal choice. (e.g.
629 # CPU DoS due to an expensive engine or a network DoS due to poor
640 # CPU DoS due to an expensive engine or a network DoS due to poor
630 # compression ratio).
641 # compression ratio).
631 configengines = ui.configlist('experimental',
642 configengines = ui.configlist('experimental',
632 'clientcompressionengines')
643 'clientcompressionengines')
633 config = 'experimental.clientcompressionengines'
644 config = 'experimental.clientcompressionengines'
634
645
635 # No explicit config. Filter out the ones that aren't supposed to be
646 # No explicit config. Filter out the ones that aren't supposed to be
636 # advertised and return default ordering.
647 # advertised and return default ordering.
637 if not configengines:
648 if not configengines:
638 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
649 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
639 return [e for e in compengines
650 return [e for e in compengines
640 if getattr(e.wireprotosupport(), attr) > 0]
651 if getattr(e.wireprotosupport(), attr) > 0]
641
652
642 # If compression engines are listed in the config, assume there is a good
653 # If compression engines are listed in the config, assume there is a good
643 # reason for it (like server operators wanting to achieve specific
654 # reason for it (like server operators wanting to achieve specific
644 # performance characteristics). So fail fast if the config references
655 # performance characteristics). So fail fast if the config references
645 # unusable compression engines.
656 # unusable compression engines.
646 validnames = set(e.name() for e in compengines)
657 validnames = set(e.name() for e in compengines)
647 invalidnames = set(e for e in configengines if e not in validnames)
658 invalidnames = set(e for e in configengines if e not in validnames)
648 if invalidnames:
659 if invalidnames:
649 raise error.Abort(_('invalid compression engine defined in %s: %s') %
660 raise error.Abort(_('invalid compression engine defined in %s: %s') %
650 (config, ', '.join(sorted(invalidnames))))
661 (config, ', '.join(sorted(invalidnames))))
651
662
652 compengines = [e for e in compengines if e.name() in configengines]
663 compengines = [e for e in compengines if e.name() in configengines]
653 compengines = sorted(compengines,
664 compengines = sorted(compengines,
654 key=lambda e: configengines.index(e.name()))
665 key=lambda e: configengines.index(e.name()))
655
666
656 if not compengines:
667 if not compengines:
657 raise error.Abort(_('%s config option does not specify any known '
668 raise error.Abort(_('%s config option does not specify any known '
658 'compression engines') % config,
669 'compression engines') % config,
659 hint=_('usable compression engines: %s') %
670 hint=_('usable compression engines: %s') %
660 ', '.sorted(validnames))
671 ', '.sorted(validnames))
661
672
662 return compengines
673 return compengines
663
674
664 # list of commands
675 # list of commands
665 commands = {}
676 commands = {}
666
677
667 def wireprotocommand(name, args=''):
678 def wireprotocommand(name, args=''):
668 """decorator for wire protocol command"""
679 """decorator for wire protocol command"""
669 def register(func):
680 def register(func):
670 commands[name] = (func, args)
681 commands[name] = (func, args)
671 return func
682 return func
672 return register
683 return register
673
684
674 @wireprotocommand('batch', 'cmds *')
685 @wireprotocommand('batch', 'cmds *')
675 def batch(repo, proto, cmds, others):
686 def batch(repo, proto, cmds, others):
676 repo = repo.filtered("served")
687 repo = repo.filtered("served")
677 res = []
688 res = []
678 for pair in cmds.split(';'):
689 for pair in cmds.split(';'):
679 op, args = pair.split(' ', 1)
690 op, args = pair.split(' ', 1)
680 vals = {}
691 vals = {}
681 for a in args.split(','):
692 for a in args.split(','):
682 if a:
693 if a:
683 n, v = a.split('=')
694 n, v = a.split('=')
684 vals[unescapearg(n)] = unescapearg(v)
695 vals[unescapearg(n)] = unescapearg(v)
685 func, spec = commands[op]
696 func, spec = commands[op]
686 if spec:
697 if spec:
687 keys = spec.split()
698 keys = spec.split()
688 data = {}
699 data = {}
689 for k in keys:
700 for k in keys:
690 if k == '*':
701 if k == '*':
691 star = {}
702 star = {}
692 for key in vals.keys():
703 for key in vals.keys():
693 if key not in keys:
704 if key not in keys:
694 star[key] = vals[key]
705 star[key] = vals[key]
695 data['*'] = star
706 data['*'] = star
696 else:
707 else:
697 data[k] = vals[k]
708 data[k] = vals[k]
698 result = func(repo, proto, *[data[k] for k in keys])
709 result = func(repo, proto, *[data[k] for k in keys])
699 else:
710 else:
700 result = func(repo, proto)
711 result = func(repo, proto)
701 if isinstance(result, ooberror):
712 if isinstance(result, ooberror):
702 return result
713 return result
703 res.append(escapearg(result))
714 res.append(escapearg(result))
704 return ';'.join(res)
715 return ';'.join(res)
705
716
706 @wireprotocommand('between', 'pairs')
717 @wireprotocommand('between', 'pairs')
707 def between(repo, proto, pairs):
718 def between(repo, proto, pairs):
708 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
719 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
709 r = []
720 r = []
710 for b in repo.between(pairs):
721 for b in repo.between(pairs):
711 r.append(encodelist(b) + "\n")
722 r.append(encodelist(b) + "\n")
712 return "".join(r)
723 return "".join(r)
713
724
714 @wireprotocommand('branchmap')
725 @wireprotocommand('branchmap')
715 def branchmap(repo, proto):
726 def branchmap(repo, proto):
716 branchmap = repo.branchmap()
727 branchmap = repo.branchmap()
717 heads = []
728 heads = []
718 for branch, nodes in branchmap.iteritems():
729 for branch, nodes in branchmap.iteritems():
719 branchname = urlreq.quote(encoding.fromlocal(branch))
730 branchname = urlreq.quote(encoding.fromlocal(branch))
720 branchnodes = encodelist(nodes)
731 branchnodes = encodelist(nodes)
721 heads.append('%s %s' % (branchname, branchnodes))
732 heads.append('%s %s' % (branchname, branchnodes))
722 return '\n'.join(heads)
733 return '\n'.join(heads)
723
734
724 @wireprotocommand('branches', 'nodes')
735 @wireprotocommand('branches', 'nodes')
725 def branches(repo, proto, nodes):
736 def branches(repo, proto, nodes):
726 nodes = decodelist(nodes)
737 nodes = decodelist(nodes)
727 r = []
738 r = []
728 for b in repo.branches(nodes):
739 for b in repo.branches(nodes):
729 r.append(encodelist(b) + "\n")
740 r.append(encodelist(b) + "\n")
730 return "".join(r)
741 return "".join(r)
731
742
732 @wireprotocommand('clonebundles', '')
743 @wireprotocommand('clonebundles', '')
733 def clonebundles(repo, proto):
744 def clonebundles(repo, proto):
734 """Server command for returning info for available bundles to seed clones.
745 """Server command for returning info for available bundles to seed clones.
735
746
736 Clients will parse this response and determine what bundle to fetch.
747 Clients will parse this response and determine what bundle to fetch.
737
748
738 Extensions may wrap this command to filter or dynamically emit data
749 Extensions may wrap this command to filter or dynamically emit data
739 depending on the request. e.g. you could advertise URLs for the closest
750 depending on the request. e.g. you could advertise URLs for the closest
740 data center given the client's IP address.
751 data center given the client's IP address.
741 """
752 """
742 return repo.vfs.tryread('clonebundles.manifest')
753 return repo.vfs.tryread('clonebundles.manifest')
743
754
744 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
755 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
745 'known', 'getbundle', 'unbundlehash', 'batch']
756 'known', 'getbundle', 'unbundlehash', 'batch']
746
757
747 def _capabilities(repo, proto):
758 def _capabilities(repo, proto):
748 """return a list of capabilities for a repo
759 """return a list of capabilities for a repo
749
760
750 This function exists to allow extensions to easily wrap capabilities
761 This function exists to allow extensions to easily wrap capabilities
751 computation
762 computation
752
763
753 - returns a lists: easy to alter
764 - returns a lists: easy to alter
754 - change done here will be propagated to both `capabilities` and `hello`
765 - change done here will be propagated to both `capabilities` and `hello`
755 command without any other action needed.
766 command without any other action needed.
756 """
767 """
757 # copy to prevent modification of the global list
768 # copy to prevent modification of the global list
758 caps = list(wireprotocaps)
769 caps = list(wireprotocaps)
759 if streamclone.allowservergeneration(repo):
770 if streamclone.allowservergeneration(repo):
760 if repo.ui.configbool('server', 'preferuncompressed'):
771 if repo.ui.configbool('server', 'preferuncompressed'):
761 caps.append('stream-preferred')
772 caps.append('stream-preferred')
762 requiredformats = repo.requirements & repo.supportedformats
773 requiredformats = repo.requirements & repo.supportedformats
763 # if our local revlogs are just revlogv1, add 'stream' cap
774 # if our local revlogs are just revlogv1, add 'stream' cap
764 if not requiredformats - {'revlogv1'}:
775 if not requiredformats - {'revlogv1'}:
765 caps.append('stream')
776 caps.append('stream')
766 # otherwise, add 'streamreqs' detailing our local revlog format
777 # otherwise, add 'streamreqs' detailing our local revlog format
767 else:
778 else:
768 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
779 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
769 if repo.ui.configbool('experimental', 'bundle2-advertise'):
780 if repo.ui.configbool('experimental', 'bundle2-advertise'):
770 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
781 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
771 caps.append('bundle2=' + urlreq.quote(capsblob))
782 caps.append('bundle2=' + urlreq.quote(capsblob))
772 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
783 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
773
784
774 if proto.name == 'http':
785 if proto.name == 'http':
775 caps.append('httpheader=%d' %
786 caps.append('httpheader=%d' %
776 repo.ui.configint('server', 'maxhttpheaderlen'))
787 repo.ui.configint('server', 'maxhttpheaderlen'))
777 if repo.ui.configbool('experimental', 'httppostargs'):
788 if repo.ui.configbool('experimental', 'httppostargs'):
778 caps.append('httppostargs')
789 caps.append('httppostargs')
779
790
780 # FUTURE advertise 0.2rx once support is implemented
791 # FUTURE advertise 0.2rx once support is implemented
781 # FUTURE advertise minrx and mintx after consulting config option
792 # FUTURE advertise minrx and mintx after consulting config option
782 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
793 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
783
794
784 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
795 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
785 if compengines:
796 if compengines:
786 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
797 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
787 for e in compengines)
798 for e in compengines)
788 caps.append('compression=%s' % comptypes)
799 caps.append('compression=%s' % comptypes)
789
800
790 return caps
801 return caps
791
802
792 # If you are writing an extension and consider wrapping this function. Wrap
803 # If you are writing an extension and consider wrapping this function. Wrap
793 # `_capabilities` instead.
804 # `_capabilities` instead.
794 @wireprotocommand('capabilities')
805 @wireprotocommand('capabilities')
795 def capabilities(repo, proto):
806 def capabilities(repo, proto):
796 return ' '.join(_capabilities(repo, proto))
807 return ' '.join(_capabilities(repo, proto))
797
808
798 @wireprotocommand('changegroup', 'roots')
809 @wireprotocommand('changegroup', 'roots')
799 def changegroup(repo, proto, roots):
810 def changegroup(repo, proto, roots):
800 nodes = decodelist(roots)
811 nodes = decodelist(roots)
801 outgoing = discovery.outgoing(repo, missingroots=nodes,
812 outgoing = discovery.outgoing(repo, missingroots=nodes,
802 missingheads=repo.heads())
813 missingheads=repo.heads())
803 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
804 gen = iter(lambda: cg.read(32768), '')
815 gen = iter(lambda: cg.read(32768), '')
805 return streamres(gen=gen, v1compressible=True)
816 return streamres(gen=gen)
806
817
807 @wireprotocommand('changegroupsubset', 'bases heads')
818 @wireprotocommand('changegroupsubset', 'bases heads')
808 def changegroupsubset(repo, proto, bases, heads):
819 def changegroupsubset(repo, proto, bases, heads):
809 bases = decodelist(bases)
820 bases = decodelist(bases)
810 heads = decodelist(heads)
821 heads = decodelist(heads)
811 outgoing = discovery.outgoing(repo, missingroots=bases,
822 outgoing = discovery.outgoing(repo, missingroots=bases,
812 missingheads=heads)
823 missingheads=heads)
813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
824 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 gen = iter(lambda: cg.read(32768), '')
825 gen = iter(lambda: cg.read(32768), '')
815 return streamres(gen=gen, v1compressible=True)
826 return streamres(gen=gen)
816
827
817 @wireprotocommand('debugwireargs', 'one two *')
828 @wireprotocommand('debugwireargs', 'one two *')
818 def debugwireargs(repo, proto, one, two, others):
829 def debugwireargs(repo, proto, one, two, others):
819 # only accept optional args from the known set
830 # only accept optional args from the known set
820 opts = options('debugwireargs', ['three', 'four'], others)
831 opts = options('debugwireargs', ['three', 'four'], others)
821 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
832 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
822
833
823 @wireprotocommand('getbundle', '*')
834 @wireprotocommand('getbundle', '*')
824 def getbundle(repo, proto, others):
835 def getbundle(repo, proto, others):
825 opts = options('getbundle', gboptsmap.keys(), others)
836 opts = options('getbundle', gboptsmap.keys(), others)
826 for k, v in opts.iteritems():
837 for k, v in opts.iteritems():
827 keytype = gboptsmap[k]
838 keytype = gboptsmap[k]
828 if keytype == 'nodes':
839 if keytype == 'nodes':
829 opts[k] = decodelist(v)
840 opts[k] = decodelist(v)
830 elif keytype == 'csv':
841 elif keytype == 'csv':
831 opts[k] = list(v.split(','))
842 opts[k] = list(v.split(','))
832 elif keytype == 'scsv':
843 elif keytype == 'scsv':
833 opts[k] = set(v.split(','))
844 opts[k] = set(v.split(','))
834 elif keytype == 'boolean':
845 elif keytype == 'boolean':
835 # Client should serialize False as '0', which is a non-empty string
846 # Client should serialize False as '0', which is a non-empty string
836 # so it evaluates as a True bool.
847 # so it evaluates as a True bool.
837 if v == '0':
848 if v == '0':
838 opts[k] = False
849 opts[k] = False
839 else:
850 else:
840 opts[k] = bool(v)
851 opts[k] = bool(v)
841 elif keytype != 'plain':
852 elif keytype != 'plain':
842 raise KeyError('unknown getbundle option type %s'
853 raise KeyError('unknown getbundle option type %s'
843 % keytype)
854 % keytype)
844
855
845 if not bundle1allowed(repo, 'pull'):
856 if not bundle1allowed(repo, 'pull'):
846 if not exchange.bundle2requested(opts.get('bundlecaps')):
857 if not exchange.bundle2requested(opts.get('bundlecaps')):
847 if proto.name == 'http':
858 if proto.name == 'http':
848 return ooberror(bundle2required)
859 return ooberror(bundle2required)
849 raise error.Abort(bundle2requiredmain,
860 raise error.Abort(bundle2requiredmain,
850 hint=bundle2requiredhint)
861 hint=bundle2requiredhint)
851
862
852 try:
863 try:
853 if repo.ui.configbool('server', 'disablefullbundle'):
864 if repo.ui.configbool('server', 'disablefullbundle'):
854 # Check to see if this is a full clone.
865 # Check to see if this is a full clone.
855 clheads = set(repo.changelog.heads())
866 clheads = set(repo.changelog.heads())
856 heads = set(opts.get('heads', set()))
867 heads = set(opts.get('heads', set()))
857 common = set(opts.get('common', set()))
868 common = set(opts.get('common', set()))
858 common.discard(nullid)
869 common.discard(nullid)
859 if not common and clheads == heads:
870 if not common and clheads == heads:
860 raise error.Abort(
871 raise error.Abort(
861 _('server has pull-based clones disabled'),
872 _('server has pull-based clones disabled'),
862 hint=_('remove --pull if specified or upgrade Mercurial'))
873 hint=_('remove --pull if specified or upgrade Mercurial'))
863
874
864 chunks = exchange.getbundlechunks(repo, 'serve',
875 chunks = exchange.getbundlechunks(repo, 'serve',
865 **pycompat.strkwargs(opts))
876 **pycompat.strkwargs(opts))
866 except error.Abort as exc:
877 except error.Abort as exc:
867 # cleanly forward Abort error to the client
878 # cleanly forward Abort error to the client
868 if not exchange.bundle2requested(opts.get('bundlecaps')):
879 if not exchange.bundle2requested(opts.get('bundlecaps')):
869 if proto.name == 'http':
880 if proto.name == 'http':
870 return ooberror(str(exc) + '\n')
881 return ooberror(str(exc) + '\n')
871 raise # cannot do better for bundle1 + ssh
882 raise # cannot do better for bundle1 + ssh
872 # bundle2 request expect a bundle2 reply
883 # bundle2 request expect a bundle2 reply
873 bundler = bundle2.bundle20(repo.ui)
884 bundler = bundle2.bundle20(repo.ui)
874 manargs = [('message', str(exc))]
885 manargs = [('message', str(exc))]
875 advargs = []
886 advargs = []
876 if exc.hint is not None:
887 if exc.hint is not None:
877 advargs.append(('hint', exc.hint))
888 advargs.append(('hint', exc.hint))
878 bundler.addpart(bundle2.bundlepart('error:abort',
889 bundler.addpart(bundle2.bundlepart('error:abort',
879 manargs, advargs))
890 manargs, advargs))
880 return streamres(gen=bundler.getchunks(), v1compressible=True)
891 return streamres(gen=bundler.getchunks())
881 return streamres(gen=chunks, v1compressible=True)
892 return streamres(gen=chunks)
882
893
883 @wireprotocommand('heads')
894 @wireprotocommand('heads')
884 def heads(repo, proto):
895 def heads(repo, proto):
885 h = repo.heads()
896 h = repo.heads()
886 return encodelist(h) + "\n"
897 return encodelist(h) + "\n"
887
898
888 @wireprotocommand('hello')
899 @wireprotocommand('hello')
889 def hello(repo, proto):
900 def hello(repo, proto):
890 '''the hello command returns a set of lines describing various
901 '''the hello command returns a set of lines describing various
891 interesting things about the server, in an RFC822-like format.
902 interesting things about the server, in an RFC822-like format.
892 Currently the only one defined is "capabilities", which
903 Currently the only one defined is "capabilities", which
893 consists of a line in the form:
904 consists of a line in the form:
894
905
895 capabilities: space separated list of tokens
906 capabilities: space separated list of tokens
896 '''
907 '''
897 return "capabilities: %s\n" % (capabilities(repo, proto))
908 return "capabilities: %s\n" % (capabilities(repo, proto))
898
909
899 @wireprotocommand('listkeys', 'namespace')
910 @wireprotocommand('listkeys', 'namespace')
900 def listkeys(repo, proto, namespace):
911 def listkeys(repo, proto, namespace):
901 d = repo.listkeys(encoding.tolocal(namespace)).items()
912 d = repo.listkeys(encoding.tolocal(namespace)).items()
902 return pushkeymod.encodekeys(d)
913 return pushkeymod.encodekeys(d)
903
914
904 @wireprotocommand('lookup', 'key')
915 @wireprotocommand('lookup', 'key')
905 def lookup(repo, proto, key):
916 def lookup(repo, proto, key):
906 try:
917 try:
907 k = encoding.tolocal(key)
918 k = encoding.tolocal(key)
908 c = repo[k]
919 c = repo[k]
909 r = c.hex()
920 r = c.hex()
910 success = 1
921 success = 1
911 except Exception as inst:
922 except Exception as inst:
912 r = str(inst)
923 r = str(inst)
913 success = 0
924 success = 0
914 return "%d %s\n" % (success, r)
925 return "%d %s\n" % (success, r)
915
926
916 @wireprotocommand('known', 'nodes *')
927 @wireprotocommand('known', 'nodes *')
917 def known(repo, proto, nodes, others):
928 def known(repo, proto, nodes, others):
918 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
929 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
919
930
920 @wireprotocommand('pushkey', 'namespace key old new')
931 @wireprotocommand('pushkey', 'namespace key old new')
921 def pushkey(repo, proto, namespace, key, old, new):
932 def pushkey(repo, proto, namespace, key, old, new):
922 # compatibility with pre-1.8 clients which were accidentally
933 # compatibility with pre-1.8 clients which were accidentally
923 # sending raw binary nodes rather than utf-8-encoded hex
934 # sending raw binary nodes rather than utf-8-encoded hex
924 if len(new) == 20 and util.escapestr(new) != new:
935 if len(new) == 20 and util.escapestr(new) != new:
925 # looks like it could be a binary node
936 # looks like it could be a binary node
926 try:
937 try:
927 new.decode('utf-8')
938 new.decode('utf-8')
928 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
939 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
929 except UnicodeDecodeError:
940 except UnicodeDecodeError:
930 pass # binary, leave unmodified
941 pass # binary, leave unmodified
931 else:
942 else:
932 new = encoding.tolocal(new) # normal path
943 new = encoding.tolocal(new) # normal path
933
944
934 if util.safehasattr(proto, 'restore'):
945 if util.safehasattr(proto, 'restore'):
935
946
936 proto.redirect()
947 proto.redirect()
937
948
938 try:
949 try:
939 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
950 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
940 encoding.tolocal(old), new) or False
951 encoding.tolocal(old), new) or False
941 except error.Abort:
952 except error.Abort:
942 r = False
953 r = False
943
954
944 output = proto.restore()
955 output = proto.restore()
945
956
946 return '%s\n%s' % (int(r), output)
957 return '%s\n%s' % (int(r), output)
947
958
948 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
959 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
949 encoding.tolocal(old), new)
960 encoding.tolocal(old), new)
950 return '%s\n' % int(r)
961 return '%s\n' % int(r)
951
962
952 @wireprotocommand('stream_out')
963 @wireprotocommand('stream_out')
953 def stream(repo, proto):
964 def stream(repo, proto):
954 '''If the server supports streaming clone, it advertises the "stream"
965 '''If the server supports streaming clone, it advertises the "stream"
955 capability with a value representing the version and flags of the repo
966 capability with a value representing the version and flags of the repo
956 it is serving. Client checks to see if it understands the format.
967 it is serving. Client checks to see if it understands the format.
957 '''
968 '''
958 return streamres(streamclone.generatev1wireproto(repo))
969 return streamres_legacy(streamclone.generatev1wireproto(repo))
959
970
960 @wireprotocommand('unbundle', 'heads')
971 @wireprotocommand('unbundle', 'heads')
961 def unbundle(repo, proto, heads):
972 def unbundle(repo, proto, heads):
962 their_heads = decodelist(heads)
973 their_heads = decodelist(heads)
963
974
964 try:
975 try:
965 proto.redirect()
976 proto.redirect()
966
977
967 exchange.check_heads(repo, their_heads, 'preparing changes')
978 exchange.check_heads(repo, their_heads, 'preparing changes')
968
979
969 # write bundle data to temporary file because it can be big
980 # write bundle data to temporary file because it can be big
970 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
981 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
971 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
982 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
972 r = 0
983 r = 0
973 try:
984 try:
974 proto.getfile(fp)
985 proto.getfile(fp)
975 fp.seek(0)
986 fp.seek(0)
976 gen = exchange.readbundle(repo.ui, fp, None)
987 gen = exchange.readbundle(repo.ui, fp, None)
977 if (isinstance(gen, changegroupmod.cg1unpacker)
988 if (isinstance(gen, changegroupmod.cg1unpacker)
978 and not bundle1allowed(repo, 'push')):
989 and not bundle1allowed(repo, 'push')):
979 if proto.name == 'http':
990 if proto.name == 'http':
980 # need to special case http because stderr do not get to
991 # need to special case http because stderr do not get to
981 # the http client on failed push so we need to abuse some
992 # the http client on failed push so we need to abuse some
982 # other error type to make sure the message get to the
993 # other error type to make sure the message get to the
983 # user.
994 # user.
984 return ooberror(bundle2required)
995 return ooberror(bundle2required)
985 raise error.Abort(bundle2requiredmain,
996 raise error.Abort(bundle2requiredmain,
986 hint=bundle2requiredhint)
997 hint=bundle2requiredhint)
987
998
988 r = exchange.unbundle(repo, gen, their_heads, 'serve',
999 r = exchange.unbundle(repo, gen, their_heads, 'serve',
989 proto._client())
1000 proto._client())
990 if util.safehasattr(r, 'addpart'):
1001 if util.safehasattr(r, 'addpart'):
991 # The return looks streamable, we are in the bundle2 case and
1002 # The return looks streamable, we are in the bundle2 case and
992 # should return a stream.
1003 # should return a stream.
993 return streamres(gen=r.getchunks())
1004 return streamres_legacy(gen=r.getchunks())
994 return pushres(r)
1005 return pushres(r)
995
1006
996 finally:
1007 finally:
997 fp.close()
1008 fp.close()
998 os.unlink(tempname)
1009 os.unlink(tempname)
999
1010
1000 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1011 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1001 # handle non-bundle2 case first
1012 # handle non-bundle2 case first
1002 if not getattr(exc, 'duringunbundle2', False):
1013 if not getattr(exc, 'duringunbundle2', False):
1003 try:
1014 try:
1004 raise
1015 raise
1005 except error.Abort:
1016 except error.Abort:
1006 # The old code we moved used util.stderr directly.
1017 # The old code we moved used util.stderr directly.
1007 # We did not change it to minimise code change.
1018 # We did not change it to minimise code change.
1008 # This need to be moved to something proper.
1019 # This need to be moved to something proper.
1009 # Feel free to do it.
1020 # Feel free to do it.
1010 util.stderr.write("abort: %s\n" % exc)
1021 util.stderr.write("abort: %s\n" % exc)
1011 if exc.hint is not None:
1022 if exc.hint is not None:
1012 util.stderr.write("(%s)\n" % exc.hint)
1023 util.stderr.write("(%s)\n" % exc.hint)
1013 return pushres(0)
1024 return pushres(0)
1014 except error.PushRaced:
1025 except error.PushRaced:
1015 return pusherr(str(exc))
1026 return pusherr(str(exc))
1016
1027
1017 bundler = bundle2.bundle20(repo.ui)
1028 bundler = bundle2.bundle20(repo.ui)
1018 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1029 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1019 bundler.addpart(out)
1030 bundler.addpart(out)
1020 try:
1031 try:
1021 try:
1032 try:
1022 raise
1033 raise
1023 except error.PushkeyFailed as exc:
1034 except error.PushkeyFailed as exc:
1024 # check client caps
1035 # check client caps
1025 remotecaps = getattr(exc, '_replycaps', None)
1036 remotecaps = getattr(exc, '_replycaps', None)
1026 if (remotecaps is not None
1037 if (remotecaps is not None
1027 and 'pushkey' not in remotecaps.get('error', ())):
1038 and 'pushkey' not in remotecaps.get('error', ())):
1028 # no support remote side, fallback to Abort handler.
1039 # no support remote side, fallback to Abort handler.
1029 raise
1040 raise
1030 part = bundler.newpart('error:pushkey')
1041 part = bundler.newpart('error:pushkey')
1031 part.addparam('in-reply-to', exc.partid)
1042 part.addparam('in-reply-to', exc.partid)
1032 if exc.namespace is not None:
1043 if exc.namespace is not None:
1033 part.addparam('namespace', exc.namespace, mandatory=False)
1044 part.addparam('namespace', exc.namespace, mandatory=False)
1034 if exc.key is not None:
1045 if exc.key is not None:
1035 part.addparam('key', exc.key, mandatory=False)
1046 part.addparam('key', exc.key, mandatory=False)
1036 if exc.new is not None:
1047 if exc.new is not None:
1037 part.addparam('new', exc.new, mandatory=False)
1048 part.addparam('new', exc.new, mandatory=False)
1038 if exc.old is not None:
1049 if exc.old is not None:
1039 part.addparam('old', exc.old, mandatory=False)
1050 part.addparam('old', exc.old, mandatory=False)
1040 if exc.ret is not None:
1051 if exc.ret is not None:
1041 part.addparam('ret', exc.ret, mandatory=False)
1052 part.addparam('ret', exc.ret, mandatory=False)
1042 except error.BundleValueError as exc:
1053 except error.BundleValueError as exc:
1043 errpart = bundler.newpart('error:unsupportedcontent')
1054 errpart = bundler.newpart('error:unsupportedcontent')
1044 if exc.parttype is not None:
1055 if exc.parttype is not None:
1045 errpart.addparam('parttype', exc.parttype)
1056 errpart.addparam('parttype', exc.parttype)
1046 if exc.params:
1057 if exc.params:
1047 errpart.addparam('params', '\0'.join(exc.params))
1058 errpart.addparam('params', '\0'.join(exc.params))
1048 except error.Abort as exc:
1059 except error.Abort as exc:
1049 manargs = [('message', str(exc))]
1060 manargs = [('message', str(exc))]
1050 advargs = []
1061 advargs = []
1051 if exc.hint is not None:
1062 if exc.hint is not None:
1052 advargs.append(('hint', exc.hint))
1063 advargs.append(('hint', exc.hint))
1053 bundler.addpart(bundle2.bundlepart('error:abort',
1064 bundler.addpart(bundle2.bundlepart('error:abort',
1054 manargs, advargs))
1065 manargs, advargs))
1055 except error.PushRaced as exc:
1066 except error.PushRaced as exc:
1056 bundler.newpart('error:pushraced', [('message', str(exc))])
1067 bundler.newpart('error:pushraced', [('message', str(exc))])
1057 return streamres(gen=bundler.getchunks())
1068 return streamres_legacy(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now