##// END OF EJS Templates
wireproto: perform chunking and compression at protocol layer (API)...
Gregory Szorc -
r30466:2add671b default
parent child Browse files
Show More
@@ -1,189 +1,189
1 # Copyright 2011 Fog Creek Software
1 # Copyright 2011 Fog Creek Software
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5 from __future__ import absolute_import
5 from __future__ import absolute_import
6
6
7 import os
7 import os
8 import re
8 import re
9
9
10 from mercurial.i18n import _
10 from mercurial.i18n import _
11
11
12 from mercurial import (
12 from mercurial import (
13 error,
13 error,
14 httppeer,
14 httppeer,
15 util,
15 util,
16 wireproto,
16 wireproto,
17 )
17 )
18
18
19 from . import (
19 from . import (
20 lfutil,
20 lfutil,
21 )
21 )
22
22
23 urlerr = util.urlerr
23 urlerr = util.urlerr
24 urlreq = util.urlreq
24 urlreq = util.urlreq
25
25
26 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
26 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
27 '\n\nPlease enable it in your Mercurial config '
27 '\n\nPlease enable it in your Mercurial config '
28 'file.\n')
28 'file.\n')
29
29
30 # these will all be replaced by largefiles.uisetup
30 # these will all be replaced by largefiles.uisetup
31 capabilitiesorig = None
31 capabilitiesorig = None
32 ssholdcallstream = None
32 ssholdcallstream = None
33 httpoldcallstream = None
33 httpoldcallstream = None
34
34
35 def putlfile(repo, proto, sha):
35 def putlfile(repo, proto, sha):
36 '''Server command for putting a largefile into a repository's local store
36 '''Server command for putting a largefile into a repository's local store
37 and into the user cache.'''
37 and into the user cache.'''
38 proto.redirect()
38 proto.redirect()
39
39
40 path = lfutil.storepath(repo, sha)
40 path = lfutil.storepath(repo, sha)
41 util.makedirs(os.path.dirname(path))
41 util.makedirs(os.path.dirname(path))
42 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
42 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
43
43
44 try:
44 try:
45 proto.getfile(tmpfp)
45 proto.getfile(tmpfp)
46 tmpfp._fp.seek(0)
46 tmpfp._fp.seek(0)
47 if sha != lfutil.hexsha1(tmpfp._fp):
47 if sha != lfutil.hexsha1(tmpfp._fp):
48 raise IOError(0, _('largefile contents do not match hash'))
48 raise IOError(0, _('largefile contents do not match hash'))
49 tmpfp.close()
49 tmpfp.close()
50 lfutil.linktousercache(repo, sha)
50 lfutil.linktousercache(repo, sha)
51 except IOError as e:
51 except IOError as e:
52 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
52 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
53 (sha, e.strerror))
53 (sha, e.strerror))
54 return wireproto.pushres(1)
54 return wireproto.pushres(1)
55 finally:
55 finally:
56 tmpfp.discard()
56 tmpfp.discard()
57
57
58 return wireproto.pushres(0)
58 return wireproto.pushres(0)
59
59
60 def getlfile(repo, proto, sha):
60 def getlfile(repo, proto, sha):
61 '''Server command for retrieving a largefile from the repository-local
61 '''Server command for retrieving a largefile from the repository-local
62 cache or user cache.'''
62 cache or user cache.'''
63 filename = lfutil.findfile(repo, sha)
63 filename = lfutil.findfile(repo, sha)
64 if not filename:
64 if not filename:
65 raise error.Abort(_('requested largefile %s not present in cache')
65 raise error.Abort(_('requested largefile %s not present in cache')
66 % sha)
66 % sha)
67 f = open(filename, 'rb')
67 f = open(filename, 'rb')
68 length = os.fstat(f.fileno())[6]
68 length = os.fstat(f.fileno())[6]
69
69
70 # Since we can't set an HTTP content-length header here, and
70 # Since we can't set an HTTP content-length header here, and
71 # Mercurial core provides no way to give the length of a streamres
71 # Mercurial core provides no way to give the length of a streamres
72 # (and reading the entire file into RAM would be ill-advised), we
72 # (and reading the entire file into RAM would be ill-advised), we
73 # just send the length on the first line of the response, like the
73 # just send the length on the first line of the response, like the
74 # ssh proto does for string responses.
74 # ssh proto does for string responses.
75 def generator():
75 def generator():
76 yield '%d\n' % length
76 yield '%d\n' % length
77 for chunk in util.filechunkiter(f):
77 for chunk in util.filechunkiter(f):
78 yield chunk
78 yield chunk
79 return wireproto.streamres(generator())
79 return wireproto.streamres(gen=generator())
80
80
81 def statlfile(repo, proto, sha):
81 def statlfile(repo, proto, sha):
82 '''Server command for checking if a largefile is present - returns '2\n' if
82 '''Server command for checking if a largefile is present - returns '2\n' if
83 the largefile is missing, '0\n' if it seems to be in good condition.
83 the largefile is missing, '0\n' if it seems to be in good condition.
84
84
85 The value 1 is reserved for mismatched checksum, but that is too expensive
85 The value 1 is reserved for mismatched checksum, but that is too expensive
86 to be verified on every stat and must be caught be running 'hg verify'
86 to be verified on every stat and must be caught be running 'hg verify'
87 server side.'''
87 server side.'''
88 filename = lfutil.findfile(repo, sha)
88 filename = lfutil.findfile(repo, sha)
89 if not filename:
89 if not filename:
90 return '2\n'
90 return '2\n'
91 return '0\n'
91 return '0\n'
92
92
93 def wirereposetup(ui, repo):
93 def wirereposetup(ui, repo):
94 class lfileswirerepository(repo.__class__):
94 class lfileswirerepository(repo.__class__):
95 def putlfile(self, sha, fd):
95 def putlfile(self, sha, fd):
96 # unfortunately, httprepository._callpush tries to convert its
96 # unfortunately, httprepository._callpush tries to convert its
97 # input file-like into a bundle before sending it, so we can't use
97 # input file-like into a bundle before sending it, so we can't use
98 # it ...
98 # it ...
99 if issubclass(self.__class__, httppeer.httppeer):
99 if issubclass(self.__class__, httppeer.httppeer):
100 res = self._call('putlfile', data=fd, sha=sha,
100 res = self._call('putlfile', data=fd, sha=sha,
101 headers={'content-type':'application/mercurial-0.1'})
101 headers={'content-type':'application/mercurial-0.1'})
102 try:
102 try:
103 d, output = res.split('\n', 1)
103 d, output = res.split('\n', 1)
104 for l in output.splitlines(True):
104 for l in output.splitlines(True):
105 self.ui.warn(_('remote: '), l) # assume l ends with \n
105 self.ui.warn(_('remote: '), l) # assume l ends with \n
106 return int(d)
106 return int(d)
107 except ValueError:
107 except ValueError:
108 self.ui.warn(_('unexpected putlfile response: %r\n') % res)
108 self.ui.warn(_('unexpected putlfile response: %r\n') % res)
109 return 1
109 return 1
110 # ... but we can't use sshrepository._call because the data=
110 # ... but we can't use sshrepository._call because the data=
111 # argument won't get sent, and _callpush does exactly what we want
111 # argument won't get sent, and _callpush does exactly what we want
112 # in this case: send the data straight through
112 # in this case: send the data straight through
113 else:
113 else:
114 try:
114 try:
115 ret, output = self._callpush("putlfile", fd, sha=sha)
115 ret, output = self._callpush("putlfile", fd, sha=sha)
116 if ret == "":
116 if ret == "":
117 raise error.ResponseError(_('putlfile failed:'),
117 raise error.ResponseError(_('putlfile failed:'),
118 output)
118 output)
119 return int(ret)
119 return int(ret)
120 except IOError:
120 except IOError:
121 return 1
121 return 1
122 except ValueError:
122 except ValueError:
123 raise error.ResponseError(
123 raise error.ResponseError(
124 _('putlfile failed (unexpected response):'), ret)
124 _('putlfile failed (unexpected response):'), ret)
125
125
126 def getlfile(self, sha):
126 def getlfile(self, sha):
127 """returns an iterable with the chunks of the file with sha sha"""
127 """returns an iterable with the chunks of the file with sha sha"""
128 stream = self._callstream("getlfile", sha=sha)
128 stream = self._callstream("getlfile", sha=sha)
129 length = stream.readline()
129 length = stream.readline()
130 try:
130 try:
131 length = int(length)
131 length = int(length)
132 except ValueError:
132 except ValueError:
133 self._abort(error.ResponseError(_("unexpected response:"),
133 self._abort(error.ResponseError(_("unexpected response:"),
134 length))
134 length))
135
135
136 # SSH streams will block if reading more than length
136 # SSH streams will block if reading more than length
137 for chunk in util.filechunkiter(stream, limit=length):
137 for chunk in util.filechunkiter(stream, limit=length):
138 yield chunk
138 yield chunk
139 # HTTP streams must hit the end to process the last empty
139 # HTTP streams must hit the end to process the last empty
140 # chunk of Chunked-Encoding so the connection can be reused.
140 # chunk of Chunked-Encoding so the connection can be reused.
141 if issubclass(self.__class__, httppeer.httppeer):
141 if issubclass(self.__class__, httppeer.httppeer):
142 chunk = stream.read(1)
142 chunk = stream.read(1)
143 if chunk:
143 if chunk:
144 self._abort(error.ResponseError(_("unexpected response:"),
144 self._abort(error.ResponseError(_("unexpected response:"),
145 chunk))
145 chunk))
146
146
147 @wireproto.batchable
147 @wireproto.batchable
148 def statlfile(self, sha):
148 def statlfile(self, sha):
149 f = wireproto.future()
149 f = wireproto.future()
150 result = {'sha': sha}
150 result = {'sha': sha}
151 yield result, f
151 yield result, f
152 try:
152 try:
153 yield int(f.value)
153 yield int(f.value)
154 except (ValueError, urlerr.httperror):
154 except (ValueError, urlerr.httperror):
155 # If the server returns anything but an integer followed by a
155 # If the server returns anything but an integer followed by a
156 # newline, newline, it's not speaking our language; if we get
156 # newline, newline, it's not speaking our language; if we get
157 # an HTTP error, we can't be sure the largefile is present;
157 # an HTTP error, we can't be sure the largefile is present;
158 # either way, consider it missing.
158 # either way, consider it missing.
159 yield 2
159 yield 2
160
160
161 repo.__class__ = lfileswirerepository
161 repo.__class__ = lfileswirerepository
162
162
163 # advertise the largefiles=serve capability
163 # advertise the largefiles=serve capability
164 def capabilities(repo, proto):
164 def capabilities(repo, proto):
165 '''Wrap server command to announce largefile server capability'''
165 '''Wrap server command to announce largefile server capability'''
166 return capabilitiesorig(repo, proto) + ' largefiles=serve'
166 return capabilitiesorig(repo, proto) + ' largefiles=serve'
167
167
168 def heads(repo, proto):
168 def heads(repo, proto):
169 '''Wrap server command - largefile capable clients will know to call
169 '''Wrap server command - largefile capable clients will know to call
170 lheads instead'''
170 lheads instead'''
171 if lfutil.islfilesrepo(repo):
171 if lfutil.islfilesrepo(repo):
172 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
172 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
173 return wireproto.heads(repo, proto)
173 return wireproto.heads(repo, proto)
174
174
175 def sshrepocallstream(self, cmd, **args):
175 def sshrepocallstream(self, cmd, **args):
176 if cmd == 'heads' and self.capable('largefiles'):
176 if cmd == 'heads' and self.capable('largefiles'):
177 cmd = 'lheads'
177 cmd = 'lheads'
178 if cmd == 'batch' and self.capable('largefiles'):
178 if cmd == 'batch' and self.capable('largefiles'):
179 args['cmds'] = args['cmds'].replace('heads ', 'lheads ')
179 args['cmds'] = args['cmds'].replace('heads ', 'lheads ')
180 return ssholdcallstream(self, cmd, **args)
180 return ssholdcallstream(self, cmd, **args)
181
181
182 headsre = re.compile(r'(^|;)heads\b')
182 headsre = re.compile(r'(^|;)heads\b')
183
183
184 def httprepocallstream(self, cmd, **args):
184 def httprepocallstream(self, cmd, **args):
185 if cmd == 'heads' and self.capable('largefiles'):
185 if cmd == 'heads' and self.capable('largefiles'):
186 cmd = 'lheads'
186 cmd = 'lheads'
187 if cmd == 'batch' and self.capable('largefiles'):
187 if cmd == 'batch' and self.capable('largefiles'):
188 args['cmds'] = headsre.sub('lheads', args['cmds'])
188 args['cmds'] = headsre.sub('lheads', args['cmds'])
189 return httpoldcallstream(self, cmd, **args)
189 return httpoldcallstream(self, cmd, **args)
@@ -1,126 +1,124
1 #
1 #
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import cgi
10 import cgi
11
11
12 from .common import (
12 from .common import (
13 HTTP_OK,
13 HTTP_OK,
14 )
14 )
15
15
16 from .. import (
16 from .. import (
17 util,
17 util,
18 wireproto,
18 wireproto,
19 )
19 )
20 stringio = util.stringio
20 stringio = util.stringio
21
21
22 urlerr = util.urlerr
22 urlerr = util.urlerr
23 urlreq = util.urlreq
23 urlreq = util.urlreq
24
24
25 HGTYPE = 'application/mercurial-0.1'
25 HGTYPE = 'application/mercurial-0.1'
26 HGERRTYPE = 'application/hg-error'
26 HGERRTYPE = 'application/hg-error'
27
27
28 class webproto(wireproto.abstractserverproto):
28 class webproto(wireproto.abstractserverproto):
29 def __init__(self, req, ui):
29 def __init__(self, req, ui):
30 self.req = req
30 self.req = req
31 self.response = ''
31 self.response = ''
32 self.ui = ui
32 self.ui = ui
33 def getargs(self, args):
33 def getargs(self, args):
34 knownargs = self._args()
34 knownargs = self._args()
35 data = {}
35 data = {}
36 keys = args.split()
36 keys = args.split()
37 for k in keys:
37 for k in keys:
38 if k == '*':
38 if k == '*':
39 star = {}
39 star = {}
40 for key in knownargs.keys():
40 for key in knownargs.keys():
41 if key != 'cmd' and key not in keys:
41 if key != 'cmd' and key not in keys:
42 star[key] = knownargs[key][0]
42 star[key] = knownargs[key][0]
43 data['*'] = star
43 data['*'] = star
44 else:
44 else:
45 data[k] = knownargs[k][0]
45 data[k] = knownargs[k][0]
46 return [data[k] for k in keys]
46 return [data[k] for k in keys]
47 def _args(self):
47 def _args(self):
48 args = self.req.form.copy()
48 args = self.req.form.copy()
49 postlen = int(self.req.env.get('HTTP_X_HGARGS_POST', 0))
49 postlen = int(self.req.env.get('HTTP_X_HGARGS_POST', 0))
50 if postlen:
50 if postlen:
51 args.update(cgi.parse_qs(
51 args.update(cgi.parse_qs(
52 self.req.read(postlen), keep_blank_values=True))
52 self.req.read(postlen), keep_blank_values=True))
53 return args
53 return args
54 chunks = []
54 chunks = []
55 i = 1
55 i = 1
56 while True:
56 while True:
57 h = self.req.env.get('HTTP_X_HGARG_' + str(i))
57 h = self.req.env.get('HTTP_X_HGARG_' + str(i))
58 if h is None:
58 if h is None:
59 break
59 break
60 chunks += [h]
60 chunks += [h]
61 i += 1
61 i += 1
62 args.update(cgi.parse_qs(''.join(chunks), keep_blank_values=True))
62 args.update(cgi.parse_qs(''.join(chunks), keep_blank_values=True))
63 return args
63 return args
64 def getfile(self, fp):
64 def getfile(self, fp):
65 length = int(self.req.env['CONTENT_LENGTH'])
65 length = int(self.req.env['CONTENT_LENGTH'])
66 for s in util.filechunkiter(self.req, limit=length):
66 for s in util.filechunkiter(self.req, limit=length):
67 fp.write(s)
67 fp.write(s)
68 def redirect(self):
68 def redirect(self):
69 self.oldio = self.ui.fout, self.ui.ferr
69 self.oldio = self.ui.fout, self.ui.ferr
70 self.ui.ferr = self.ui.fout = stringio()
70 self.ui.ferr = self.ui.fout = stringio()
71 def restore(self):
71 def restore(self):
72 val = self.ui.fout.getvalue()
72 val = self.ui.fout.getvalue()
73 self.ui.ferr, self.ui.fout = self.oldio
73 self.ui.ferr, self.ui.fout = self.oldio
74 return val
74 return val
75
75
76 def groupchunks(self, fh):
77 def getchunks():
78 while True:
79 chunk = fh.read(32768)
80 if not chunk:
81 break
82 yield chunk
83
84 return self.compresschunks(getchunks())
85
86 def compresschunks(self, chunks):
76 def compresschunks(self, chunks):
87 # Don't allow untrusted settings because disabling compression or
77 # Don't allow untrusted settings because disabling compression or
88 # setting a very high compression level could lead to flooding
78 # setting a very high compression level could lead to flooding
89 # the server's network or CPU.
79 # the server's network or CPU.
90 opts = {'level': self.ui.configint('server', 'zliblevel', -1)}
80 opts = {'level': self.ui.configint('server', 'zliblevel', -1)}
91 return util.compengines['zlib'].compressstream(chunks, opts)
81 return util.compengines['zlib'].compressstream(chunks, opts)
92
82
93 def _client(self):
83 def _client(self):
94 return 'remote:%s:%s:%s' % (
84 return 'remote:%s:%s:%s' % (
95 self.req.env.get('wsgi.url_scheme') or 'http',
85 self.req.env.get('wsgi.url_scheme') or 'http',
96 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
86 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
97 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
87 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
98
88
99 def iscmd(cmd):
89 def iscmd(cmd):
100 return cmd in wireproto.commands
90 return cmd in wireproto.commands
101
91
102 def call(repo, req, cmd):
92 def call(repo, req, cmd):
103 p = webproto(req, repo.ui)
93 p = webproto(req, repo.ui)
104 rsp = wireproto.dispatch(repo, p, cmd)
94 rsp = wireproto.dispatch(repo, p, cmd)
105 if isinstance(rsp, str):
95 if isinstance(rsp, str):
106 req.respond(HTTP_OK, HGTYPE, body=rsp)
96 req.respond(HTTP_OK, HGTYPE, body=rsp)
107 return []
97 return []
108 elif isinstance(rsp, wireproto.streamres):
98 elif isinstance(rsp, wireproto.streamres):
99 if rsp.reader:
100 gen = iter(lambda: rsp.reader.read(32768), '')
101 else:
102 gen = rsp.gen
103
104 if rsp.v1compressible:
105 gen = p.compresschunks(gen)
106
109 req.respond(HTTP_OK, HGTYPE)
107 req.respond(HTTP_OK, HGTYPE)
110 return rsp.gen
108 return gen
111 elif isinstance(rsp, wireproto.pushres):
109 elif isinstance(rsp, wireproto.pushres):
112 val = p.restore()
110 val = p.restore()
113 rsp = '%d\n%s' % (rsp.res, val)
111 rsp = '%d\n%s' % (rsp.res, val)
114 req.respond(HTTP_OK, HGTYPE, body=rsp)
112 req.respond(HTTP_OK, HGTYPE, body=rsp)
115 return []
113 return []
116 elif isinstance(rsp, wireproto.pusherr):
114 elif isinstance(rsp, wireproto.pusherr):
117 # drain the incoming bundle
115 # drain the incoming bundle
118 req.drain()
116 req.drain()
119 p.restore()
117 p.restore()
120 rsp = '0\n%s\n' % rsp.res
118 rsp = '0\n%s\n' % rsp.res
121 req.respond(HTTP_OK, HGTYPE, body=rsp)
119 req.respond(HTTP_OK, HGTYPE, body=rsp)
122 return []
120 return []
123 elif isinstance(rsp, wireproto.ooberror):
121 elif isinstance(rsp, wireproto.ooberror):
124 rsp = rsp.message
122 rsp = rsp.message
125 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
123 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
126 return []
124 return []
@@ -1,135 +1,134
1 # sshserver.py - ssh protocol server support for mercurial
1 # sshserver.py - ssh protocol server support for mercurial
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import os
11 import os
12 import sys
12 import sys
13
13
14 from .i18n import _
14 from .i18n import _
15 from . import (
15 from . import (
16 error,
16 error,
17 hook,
17 hook,
18 util,
18 util,
19 wireproto,
19 wireproto,
20 )
20 )
21
21
22 class sshserver(wireproto.abstractserverproto):
22 class sshserver(wireproto.abstractserverproto):
23 def __init__(self, ui, repo):
23 def __init__(self, ui, repo):
24 self.ui = ui
24 self.ui = ui
25 self.repo = repo
25 self.repo = repo
26 self.lock = None
26 self.lock = None
27 self.fin = ui.fin
27 self.fin = ui.fin
28 self.fout = ui.fout
28 self.fout = ui.fout
29
29
30 hook.redirect(True)
30 hook.redirect(True)
31 ui.fout = repo.ui.fout = ui.ferr
31 ui.fout = repo.ui.fout = ui.ferr
32
32
33 # Prevent insertion/deletion of CRs
33 # Prevent insertion/deletion of CRs
34 util.setbinary(self.fin)
34 util.setbinary(self.fin)
35 util.setbinary(self.fout)
35 util.setbinary(self.fout)
36
36
37 def getargs(self, args):
37 def getargs(self, args):
38 data = {}
38 data = {}
39 keys = args.split()
39 keys = args.split()
40 for n in xrange(len(keys)):
40 for n in xrange(len(keys)):
41 argline = self.fin.readline()[:-1]
41 argline = self.fin.readline()[:-1]
42 arg, l = argline.split()
42 arg, l = argline.split()
43 if arg not in keys:
43 if arg not in keys:
44 raise error.Abort(_("unexpected parameter %r") % arg)
44 raise error.Abort(_("unexpected parameter %r") % arg)
45 if arg == '*':
45 if arg == '*':
46 star = {}
46 star = {}
47 for k in xrange(int(l)):
47 for k in xrange(int(l)):
48 argline = self.fin.readline()[:-1]
48 argline = self.fin.readline()[:-1]
49 arg, l = argline.split()
49 arg, l = argline.split()
50 val = self.fin.read(int(l))
50 val = self.fin.read(int(l))
51 star[arg] = val
51 star[arg] = val
52 data['*'] = star
52 data['*'] = star
53 else:
53 else:
54 val = self.fin.read(int(l))
54 val = self.fin.read(int(l))
55 data[arg] = val
55 data[arg] = val
56 return [data[k] for k in keys]
56 return [data[k] for k in keys]
57
57
58 def getarg(self, name):
58 def getarg(self, name):
59 return self.getargs(name)[0]
59 return self.getargs(name)[0]
60
60
61 def getfile(self, fpout):
61 def getfile(self, fpout):
62 self.sendresponse('')
62 self.sendresponse('')
63 count = int(self.fin.readline())
63 count = int(self.fin.readline())
64 while count:
64 while count:
65 fpout.write(self.fin.read(count))
65 fpout.write(self.fin.read(count))
66 count = int(self.fin.readline())
66 count = int(self.fin.readline())
67
67
68 def redirect(self):
68 def redirect(self):
69 pass
69 pass
70
70
71 def groupchunks(self, fh):
72 return iter(lambda: fh.read(4096), '')
73
74 def compresschunks(self, chunks):
75 for chunk in chunks:
76 yield chunk
77
78 def sendresponse(self, v):
71 def sendresponse(self, v):
79 self.fout.write("%d\n" % len(v))
72 self.fout.write("%d\n" % len(v))
80 self.fout.write(v)
73 self.fout.write(v)
81 self.fout.flush()
74 self.fout.flush()
82
75
83 def sendstream(self, source):
76 def sendstream(self, source):
84 write = self.fout.write
77 write = self.fout.write
85 for chunk in source.gen:
78
79 if source.reader:
80 gen = iter(lambda: source.reader.read(4096), '')
81 else:
82 gen = source.gen
83
84 for chunk in gen:
86 write(chunk)
85 write(chunk)
87 self.fout.flush()
86 self.fout.flush()
88
87
89 def sendpushresponse(self, rsp):
88 def sendpushresponse(self, rsp):
90 self.sendresponse('')
89 self.sendresponse('')
91 self.sendresponse(str(rsp.res))
90 self.sendresponse(str(rsp.res))
92
91
93 def sendpusherror(self, rsp):
92 def sendpusherror(self, rsp):
94 self.sendresponse(rsp.res)
93 self.sendresponse(rsp.res)
95
94
96 def sendooberror(self, rsp):
95 def sendooberror(self, rsp):
97 self.ui.ferr.write('%s\n-\n' % rsp.message)
96 self.ui.ferr.write('%s\n-\n' % rsp.message)
98 self.ui.ferr.flush()
97 self.ui.ferr.flush()
99 self.fout.write('\n')
98 self.fout.write('\n')
100 self.fout.flush()
99 self.fout.flush()
101
100
102 def serve_forever(self):
101 def serve_forever(self):
103 try:
102 try:
104 while self.serve_one():
103 while self.serve_one():
105 pass
104 pass
106 finally:
105 finally:
107 if self.lock is not None:
106 if self.lock is not None:
108 self.lock.release()
107 self.lock.release()
109 sys.exit(0)
108 sys.exit(0)
110
109
111 handlers = {
110 handlers = {
112 str: sendresponse,
111 str: sendresponse,
113 wireproto.streamres: sendstream,
112 wireproto.streamres: sendstream,
114 wireproto.pushres: sendpushresponse,
113 wireproto.pushres: sendpushresponse,
115 wireproto.pusherr: sendpusherror,
114 wireproto.pusherr: sendpusherror,
116 wireproto.ooberror: sendooberror,
115 wireproto.ooberror: sendooberror,
117 }
116 }
118
117
119 def serve_one(self):
118 def serve_one(self):
120 cmd = self.fin.readline()[:-1]
119 cmd = self.fin.readline()[:-1]
121 if cmd and cmd in wireproto.commands:
120 if cmd and cmd in wireproto.commands:
122 rsp = wireproto.dispatch(self.repo, self, cmd)
121 rsp = wireproto.dispatch(self.repo, self, cmd)
123 self.handlers[rsp.__class__](self, rsp)
122 self.handlers[rsp.__class__](self, rsp)
124 elif cmd:
123 elif cmd:
125 impl = getattr(self, 'do_' + cmd, None)
124 impl = getattr(self, 'do_' + cmd, None)
126 if impl:
125 if impl:
127 r = impl()
126 r = impl()
128 if r is not None:
127 if r is not None:
129 self.sendresponse(r)
128 self.sendresponse(r)
130 else: self.sendresponse("")
129 else: self.sendresponse("")
131 return cmd != ''
130 return cmd != ''
132
131
133 def _client(self):
132 def _client(self):
134 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
133 client = os.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
135 return 'remote:ssh:' + client
134 return 'remote:ssh:' + client
@@ -1,965 +1,959
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
11 import itertools
12 import os
12 import os
13 import sys
13 import sys
14 import tempfile
14 import tempfile
15
15
16 from .i18n import _
16 from .i18n import _
17 from .node import (
17 from .node import (
18 bin,
18 bin,
19 hex,
19 hex,
20 )
20 )
21
21
22 from . import (
22 from . import (
23 bundle2,
23 bundle2,
24 changegroup as changegroupmod,
24 changegroup as changegroupmod,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 streamclone,
30 streamclone,
31 util,
31 util,
32 )
32 )
33
33
34 urlerr = util.urlerr
34 urlerr = util.urlerr
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37 bundle2required = _(
37 bundle2required = _(
38 'incompatible Mercurial client; bundle2 required\n'
38 'incompatible Mercurial client; bundle2 required\n'
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40
40
41 class abstractserverproto(object):
41 class abstractserverproto(object):
42 """abstract class that summarizes the protocol API
42 """abstract class that summarizes the protocol API
43
43
44 Used as reference and documentation.
44 Used as reference and documentation.
45 """
45 """
46
46
47 def getargs(self, args):
47 def getargs(self, args):
48 """return the value for arguments in <args>
48 """return the value for arguments in <args>
49
49
50 returns a list of values (same order as <args>)"""
50 returns a list of values (same order as <args>)"""
51 raise NotImplementedError()
51 raise NotImplementedError()
52
52
53 def getfile(self, fp):
53 def getfile(self, fp):
54 """write the whole content of a file into a file like object
54 """write the whole content of a file into a file like object
55
55
56 The file is in the form::
56 The file is in the form::
57
57
58 (<chunk-size>\n<chunk>)+0\n
58 (<chunk-size>\n<chunk>)+0\n
59
59
60 chunk size is the ascii version of the int.
60 chunk size is the ascii version of the int.
61 """
61 """
62 raise NotImplementedError()
62 raise NotImplementedError()
63
63
64 def redirect(self):
64 def redirect(self):
65 """may setup interception for stdout and stderr
65 """may setup interception for stdout and stderr
66
66
67 See also the `restore` method."""
67 See also the `restore` method."""
68 raise NotImplementedError()
68 raise NotImplementedError()
69
69
70 # If the `redirect` function does install interception, the `restore`
70 # If the `redirect` function does install interception, the `restore`
71 # function MUST be defined. If interception is not used, this function
71 # function MUST be defined. If interception is not used, this function
72 # MUST NOT be defined.
72 # MUST NOT be defined.
73 #
73 #
74 # left commented here on purpose
74 # left commented here on purpose
75 #
75 #
76 #def restore(self):
76 #def restore(self):
77 # """reinstall previous stdout and stderr and return intercepted stdout
77 # """reinstall previous stdout and stderr and return intercepted stdout
78 # """
78 # """
79 # raise NotImplementedError()
79 # raise NotImplementedError()
80
80
81 def groupchunks(self, fh):
82 """Generator of chunks to send to the client.
83
84 Some protocols may have compressed the contents.
85 """
86 raise NotImplementedError()
87
88 def compresschunks(self, chunks):
89 """Generator of possible compressed chunks to send to the client.
90
91 This is like ``groupchunks()`` except it accepts a generator as
92 its argument.
93 """
94 raise NotImplementedError()
95
96 class remotebatch(peer.batcher):
81 class remotebatch(peer.batcher):
97 '''batches the queued calls; uses as few roundtrips as possible'''
82 '''batches the queued calls; uses as few roundtrips as possible'''
98 def __init__(self, remote):
83 def __init__(self, remote):
99 '''remote must support _submitbatch(encbatch) and
84 '''remote must support _submitbatch(encbatch) and
100 _submitone(op, encargs)'''
85 _submitone(op, encargs)'''
101 peer.batcher.__init__(self)
86 peer.batcher.__init__(self)
102 self.remote = remote
87 self.remote = remote
103 def submit(self):
88 def submit(self):
104 req, rsp = [], []
89 req, rsp = [], []
105 for name, args, opts, resref in self.calls:
90 for name, args, opts, resref in self.calls:
106 mtd = getattr(self.remote, name)
91 mtd = getattr(self.remote, name)
107 batchablefn = getattr(mtd, 'batchable', None)
92 batchablefn = getattr(mtd, 'batchable', None)
108 if batchablefn is not None:
93 if batchablefn is not None:
109 batchable = batchablefn(mtd.im_self, *args, **opts)
94 batchable = batchablefn(mtd.im_self, *args, **opts)
110 encargsorres, encresref = next(batchable)
95 encargsorres, encresref = next(batchable)
111 if encresref:
96 if encresref:
112 req.append((name, encargsorres,))
97 req.append((name, encargsorres,))
113 rsp.append((batchable, encresref, resref,))
98 rsp.append((batchable, encresref, resref,))
114 else:
99 else:
115 resref.set(encargsorres)
100 resref.set(encargsorres)
116 else:
101 else:
117 if req:
102 if req:
118 self._submitreq(req, rsp)
103 self._submitreq(req, rsp)
119 req, rsp = [], []
104 req, rsp = [], []
120 resref.set(mtd(*args, **opts))
105 resref.set(mtd(*args, **opts))
121 if req:
106 if req:
122 self._submitreq(req, rsp)
107 self._submitreq(req, rsp)
123 def _submitreq(self, req, rsp):
108 def _submitreq(self, req, rsp):
124 encresults = self.remote._submitbatch(req)
109 encresults = self.remote._submitbatch(req)
125 for encres, r in zip(encresults, rsp):
110 for encres, r in zip(encresults, rsp):
126 batchable, encresref, resref = r
111 batchable, encresref, resref = r
127 encresref.set(encres)
112 encresref.set(encres)
128 resref.set(next(batchable))
113 resref.set(next(batchable))
129
114
130 class remoteiterbatcher(peer.iterbatcher):
115 class remoteiterbatcher(peer.iterbatcher):
131 def __init__(self, remote):
116 def __init__(self, remote):
132 super(remoteiterbatcher, self).__init__()
117 super(remoteiterbatcher, self).__init__()
133 self._remote = remote
118 self._remote = remote
134
119
135 def __getattr__(self, name):
120 def __getattr__(self, name):
136 if not getattr(self._remote, name, False):
121 if not getattr(self._remote, name, False):
137 raise AttributeError(
122 raise AttributeError(
138 'Attempted to iterbatch non-batchable call to %r' % name)
123 'Attempted to iterbatch non-batchable call to %r' % name)
139 return super(remoteiterbatcher, self).__getattr__(name)
124 return super(remoteiterbatcher, self).__getattr__(name)
140
125
141 def submit(self):
126 def submit(self):
142 """Break the batch request into many patch calls and pipeline them.
127 """Break the batch request into many patch calls and pipeline them.
143
128
144 This is mostly valuable over http where request sizes can be
129 This is mostly valuable over http where request sizes can be
145 limited, but can be used in other places as well.
130 limited, but can be used in other places as well.
146 """
131 """
147 req, rsp = [], []
132 req, rsp = [], []
148 for name, args, opts, resref in self.calls:
133 for name, args, opts, resref in self.calls:
149 mtd = getattr(self._remote, name)
134 mtd = getattr(self._remote, name)
150 batchable = mtd.batchable(mtd.im_self, *args, **opts)
135 batchable = mtd.batchable(mtd.im_self, *args, **opts)
151 encargsorres, encresref = next(batchable)
136 encargsorres, encresref = next(batchable)
152 assert encresref
137 assert encresref
153 req.append((name, encargsorres))
138 req.append((name, encargsorres))
154 rsp.append((batchable, encresref))
139 rsp.append((batchable, encresref))
155 if req:
140 if req:
156 self._resultiter = self._remote._submitbatch(req)
141 self._resultiter = self._remote._submitbatch(req)
157 self._rsp = rsp
142 self._rsp = rsp
158
143
159 def results(self):
144 def results(self):
160 for (batchable, encresref), encres in itertools.izip(
145 for (batchable, encresref), encres in itertools.izip(
161 self._rsp, self._resultiter):
146 self._rsp, self._resultiter):
162 encresref.set(encres)
147 encresref.set(encres)
163 yield next(batchable)
148 yield next(batchable)
164
149
165 # Forward a couple of names from peer to make wireproto interactions
150 # Forward a couple of names from peer to make wireproto interactions
166 # slightly more sensible.
151 # slightly more sensible.
167 batchable = peer.batchable
152 batchable = peer.batchable
168 future = peer.future
153 future = peer.future
169
154
170 # list of nodes encoding / decoding
155 # list of nodes encoding / decoding
171
156
172 def decodelist(l, sep=' '):
157 def decodelist(l, sep=' '):
173 if l:
158 if l:
174 return map(bin, l.split(sep))
159 return map(bin, l.split(sep))
175 return []
160 return []
176
161
177 def encodelist(l, sep=' '):
162 def encodelist(l, sep=' '):
178 try:
163 try:
179 return sep.join(map(hex, l))
164 return sep.join(map(hex, l))
180 except TypeError:
165 except TypeError:
181 raise
166 raise
182
167
183 # batched call argument encoding
168 # batched call argument encoding
184
169
185 def escapearg(plain):
170 def escapearg(plain):
186 return (plain
171 return (plain
187 .replace(':', ':c')
172 .replace(':', ':c')
188 .replace(',', ':o')
173 .replace(',', ':o')
189 .replace(';', ':s')
174 .replace(';', ':s')
190 .replace('=', ':e'))
175 .replace('=', ':e'))
191
176
192 def unescapearg(escaped):
177 def unescapearg(escaped):
193 return (escaped
178 return (escaped
194 .replace(':e', '=')
179 .replace(':e', '=')
195 .replace(':s', ';')
180 .replace(':s', ';')
196 .replace(':o', ',')
181 .replace(':o', ',')
197 .replace(':c', ':'))
182 .replace(':c', ':'))
198
183
199 def encodebatchcmds(req):
184 def encodebatchcmds(req):
200 """Return a ``cmds`` argument value for the ``batch`` command."""
185 """Return a ``cmds`` argument value for the ``batch`` command."""
201 cmds = []
186 cmds = []
202 for op, argsdict in req:
187 for op, argsdict in req:
203 # Old servers didn't properly unescape argument names. So prevent
188 # Old servers didn't properly unescape argument names. So prevent
204 # the sending of argument names that may not be decoded properly by
189 # the sending of argument names that may not be decoded properly by
205 # servers.
190 # servers.
206 assert all(escapearg(k) == k for k in argsdict)
191 assert all(escapearg(k) == k for k in argsdict)
207
192
208 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
193 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
209 for k, v in argsdict.iteritems())
194 for k, v in argsdict.iteritems())
210 cmds.append('%s %s' % (op, args))
195 cmds.append('%s %s' % (op, args))
211
196
212 return ';'.join(cmds)
197 return ';'.join(cmds)
213
198
214 # mapping of options accepted by getbundle and their types
199 # mapping of options accepted by getbundle and their types
215 #
200 #
216 # Meant to be extended by extensions. It is extensions responsibility to ensure
201 # Meant to be extended by extensions. It is extensions responsibility to ensure
217 # such options are properly processed in exchange.getbundle.
202 # such options are properly processed in exchange.getbundle.
218 #
203 #
219 # supported types are:
204 # supported types are:
220 #
205 #
221 # :nodes: list of binary nodes
206 # :nodes: list of binary nodes
222 # :csv: list of comma-separated values
207 # :csv: list of comma-separated values
223 # :scsv: list of comma-separated values return as set
208 # :scsv: list of comma-separated values return as set
224 # :plain: string with no transformation needed.
209 # :plain: string with no transformation needed.
225 gboptsmap = {'heads': 'nodes',
210 gboptsmap = {'heads': 'nodes',
226 'common': 'nodes',
211 'common': 'nodes',
227 'obsmarkers': 'boolean',
212 'obsmarkers': 'boolean',
228 'bundlecaps': 'scsv',
213 'bundlecaps': 'scsv',
229 'listkeys': 'csv',
214 'listkeys': 'csv',
230 'cg': 'boolean',
215 'cg': 'boolean',
231 'cbattempted': 'boolean'}
216 'cbattempted': 'boolean'}
232
217
233 # client side
218 # client side
234
219
235 class wirepeer(peer.peerrepository):
220 class wirepeer(peer.peerrepository):
236 """Client-side interface for communicating with a peer repository.
221 """Client-side interface for communicating with a peer repository.
237
222
238 Methods commonly call wire protocol commands of the same name.
223 Methods commonly call wire protocol commands of the same name.
239
224
240 See also httppeer.py and sshpeer.py for protocol-specific
225 See also httppeer.py and sshpeer.py for protocol-specific
241 implementations of this interface.
226 implementations of this interface.
242 """
227 """
243 def batch(self):
228 def batch(self):
244 if self.capable('batch'):
229 if self.capable('batch'):
245 return remotebatch(self)
230 return remotebatch(self)
246 else:
231 else:
247 return peer.localbatch(self)
232 return peer.localbatch(self)
248 def _submitbatch(self, req):
233 def _submitbatch(self, req):
249 """run batch request <req> on the server
234 """run batch request <req> on the server
250
235
251 Returns an iterator of the raw responses from the server.
236 Returns an iterator of the raw responses from the server.
252 """
237 """
253 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
238 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
254 chunk = rsp.read(1024)
239 chunk = rsp.read(1024)
255 work = [chunk]
240 work = [chunk]
256 while chunk:
241 while chunk:
257 while ';' not in chunk and chunk:
242 while ';' not in chunk and chunk:
258 chunk = rsp.read(1024)
243 chunk = rsp.read(1024)
259 work.append(chunk)
244 work.append(chunk)
260 merged = ''.join(work)
245 merged = ''.join(work)
261 while ';' in merged:
246 while ';' in merged:
262 one, merged = merged.split(';', 1)
247 one, merged = merged.split(';', 1)
263 yield unescapearg(one)
248 yield unescapearg(one)
264 chunk = rsp.read(1024)
249 chunk = rsp.read(1024)
265 work = [merged, chunk]
250 work = [merged, chunk]
266 yield unescapearg(''.join(work))
251 yield unescapearg(''.join(work))
267
252
268 def _submitone(self, op, args):
253 def _submitone(self, op, args):
269 return self._call(op, **args)
254 return self._call(op, **args)
270
255
271 def iterbatch(self):
256 def iterbatch(self):
272 return remoteiterbatcher(self)
257 return remoteiterbatcher(self)
273
258
274 @batchable
259 @batchable
275 def lookup(self, key):
260 def lookup(self, key):
276 self.requirecap('lookup', _('look up remote revision'))
261 self.requirecap('lookup', _('look up remote revision'))
277 f = future()
262 f = future()
278 yield {'key': encoding.fromlocal(key)}, f
263 yield {'key': encoding.fromlocal(key)}, f
279 d = f.value
264 d = f.value
280 success, data = d[:-1].split(" ", 1)
265 success, data = d[:-1].split(" ", 1)
281 if int(success):
266 if int(success):
282 yield bin(data)
267 yield bin(data)
283 self._abort(error.RepoError(data))
268 self._abort(error.RepoError(data))
284
269
285 @batchable
270 @batchable
286 def heads(self):
271 def heads(self):
287 f = future()
272 f = future()
288 yield {}, f
273 yield {}, f
289 d = f.value
274 d = f.value
290 try:
275 try:
291 yield decodelist(d[:-1])
276 yield decodelist(d[:-1])
292 except ValueError:
277 except ValueError:
293 self._abort(error.ResponseError(_("unexpected response:"), d))
278 self._abort(error.ResponseError(_("unexpected response:"), d))
294
279
295 @batchable
280 @batchable
296 def known(self, nodes):
281 def known(self, nodes):
297 f = future()
282 f = future()
298 yield {'nodes': encodelist(nodes)}, f
283 yield {'nodes': encodelist(nodes)}, f
299 d = f.value
284 d = f.value
300 try:
285 try:
301 yield [bool(int(b)) for b in d]
286 yield [bool(int(b)) for b in d]
302 except ValueError:
287 except ValueError:
303 self._abort(error.ResponseError(_("unexpected response:"), d))
288 self._abort(error.ResponseError(_("unexpected response:"), d))
304
289
305 @batchable
290 @batchable
306 def branchmap(self):
291 def branchmap(self):
307 f = future()
292 f = future()
308 yield {}, f
293 yield {}, f
309 d = f.value
294 d = f.value
310 try:
295 try:
311 branchmap = {}
296 branchmap = {}
312 for branchpart in d.splitlines():
297 for branchpart in d.splitlines():
313 branchname, branchheads = branchpart.split(' ', 1)
298 branchname, branchheads = branchpart.split(' ', 1)
314 branchname = encoding.tolocal(urlreq.unquote(branchname))
299 branchname = encoding.tolocal(urlreq.unquote(branchname))
315 branchheads = decodelist(branchheads)
300 branchheads = decodelist(branchheads)
316 branchmap[branchname] = branchheads
301 branchmap[branchname] = branchheads
317 yield branchmap
302 yield branchmap
318 except TypeError:
303 except TypeError:
319 self._abort(error.ResponseError(_("unexpected response:"), d))
304 self._abort(error.ResponseError(_("unexpected response:"), d))
320
305
321 def branches(self, nodes):
306 def branches(self, nodes):
322 n = encodelist(nodes)
307 n = encodelist(nodes)
323 d = self._call("branches", nodes=n)
308 d = self._call("branches", nodes=n)
324 try:
309 try:
325 br = [tuple(decodelist(b)) for b in d.splitlines()]
310 br = [tuple(decodelist(b)) for b in d.splitlines()]
326 return br
311 return br
327 except ValueError:
312 except ValueError:
328 self._abort(error.ResponseError(_("unexpected response:"), d))
313 self._abort(error.ResponseError(_("unexpected response:"), d))
329
314
330 def between(self, pairs):
315 def between(self, pairs):
331 batch = 8 # avoid giant requests
316 batch = 8 # avoid giant requests
332 r = []
317 r = []
333 for i in xrange(0, len(pairs), batch):
318 for i in xrange(0, len(pairs), batch):
334 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
319 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
335 d = self._call("between", pairs=n)
320 d = self._call("between", pairs=n)
336 try:
321 try:
337 r.extend(l and decodelist(l) or [] for l in d.splitlines())
322 r.extend(l and decodelist(l) or [] for l in d.splitlines())
338 except ValueError:
323 except ValueError:
339 self._abort(error.ResponseError(_("unexpected response:"), d))
324 self._abort(error.ResponseError(_("unexpected response:"), d))
340 return r
325 return r
341
326
342 @batchable
327 @batchable
343 def pushkey(self, namespace, key, old, new):
328 def pushkey(self, namespace, key, old, new):
344 if not self.capable('pushkey'):
329 if not self.capable('pushkey'):
345 yield False, None
330 yield False, None
346 f = future()
331 f = future()
347 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
332 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
348 yield {'namespace': encoding.fromlocal(namespace),
333 yield {'namespace': encoding.fromlocal(namespace),
349 'key': encoding.fromlocal(key),
334 'key': encoding.fromlocal(key),
350 'old': encoding.fromlocal(old),
335 'old': encoding.fromlocal(old),
351 'new': encoding.fromlocal(new)}, f
336 'new': encoding.fromlocal(new)}, f
352 d = f.value
337 d = f.value
353 d, output = d.split('\n', 1)
338 d, output = d.split('\n', 1)
354 try:
339 try:
355 d = bool(int(d))
340 d = bool(int(d))
356 except ValueError:
341 except ValueError:
357 raise error.ResponseError(
342 raise error.ResponseError(
358 _('push failed (unexpected response):'), d)
343 _('push failed (unexpected response):'), d)
359 for l in output.splitlines(True):
344 for l in output.splitlines(True):
360 self.ui.status(_('remote: '), l)
345 self.ui.status(_('remote: '), l)
361 yield d
346 yield d
362
347
363 @batchable
348 @batchable
364 def listkeys(self, namespace):
349 def listkeys(self, namespace):
365 if not self.capable('pushkey'):
350 if not self.capable('pushkey'):
366 yield {}, None
351 yield {}, None
367 f = future()
352 f = future()
368 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
353 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
369 yield {'namespace': encoding.fromlocal(namespace)}, f
354 yield {'namespace': encoding.fromlocal(namespace)}, f
370 d = f.value
355 d = f.value
371 self.ui.debug('received listkey for "%s": %i bytes\n'
356 self.ui.debug('received listkey for "%s": %i bytes\n'
372 % (namespace, len(d)))
357 % (namespace, len(d)))
373 yield pushkeymod.decodekeys(d)
358 yield pushkeymod.decodekeys(d)
374
359
375 def stream_out(self):
360 def stream_out(self):
376 return self._callstream('stream_out')
361 return self._callstream('stream_out')
377
362
378 def changegroup(self, nodes, kind):
363 def changegroup(self, nodes, kind):
379 n = encodelist(nodes)
364 n = encodelist(nodes)
380 f = self._callcompressable("changegroup", roots=n)
365 f = self._callcompressable("changegroup", roots=n)
381 return changegroupmod.cg1unpacker(f, 'UN')
366 return changegroupmod.cg1unpacker(f, 'UN')
382
367
383 def changegroupsubset(self, bases, heads, kind):
368 def changegroupsubset(self, bases, heads, kind):
384 self.requirecap('changegroupsubset', _('look up remote changes'))
369 self.requirecap('changegroupsubset', _('look up remote changes'))
385 bases = encodelist(bases)
370 bases = encodelist(bases)
386 heads = encodelist(heads)
371 heads = encodelist(heads)
387 f = self._callcompressable("changegroupsubset",
372 f = self._callcompressable("changegroupsubset",
388 bases=bases, heads=heads)
373 bases=bases, heads=heads)
389 return changegroupmod.cg1unpacker(f, 'UN')
374 return changegroupmod.cg1unpacker(f, 'UN')
390
375
391 def getbundle(self, source, **kwargs):
376 def getbundle(self, source, **kwargs):
392 self.requirecap('getbundle', _('look up remote changes'))
377 self.requirecap('getbundle', _('look up remote changes'))
393 opts = {}
378 opts = {}
394 bundlecaps = kwargs.get('bundlecaps')
379 bundlecaps = kwargs.get('bundlecaps')
395 if bundlecaps is not None:
380 if bundlecaps is not None:
396 kwargs['bundlecaps'] = sorted(bundlecaps)
381 kwargs['bundlecaps'] = sorted(bundlecaps)
397 else:
382 else:
398 bundlecaps = () # kwargs could have it to None
383 bundlecaps = () # kwargs could have it to None
399 for key, value in kwargs.iteritems():
384 for key, value in kwargs.iteritems():
400 if value is None:
385 if value is None:
401 continue
386 continue
402 keytype = gboptsmap.get(key)
387 keytype = gboptsmap.get(key)
403 if keytype is None:
388 if keytype is None:
404 assert False, 'unexpected'
389 assert False, 'unexpected'
405 elif keytype == 'nodes':
390 elif keytype == 'nodes':
406 value = encodelist(value)
391 value = encodelist(value)
407 elif keytype in ('csv', 'scsv'):
392 elif keytype in ('csv', 'scsv'):
408 value = ','.join(value)
393 value = ','.join(value)
409 elif keytype == 'boolean':
394 elif keytype == 'boolean':
410 value = '%i' % bool(value)
395 value = '%i' % bool(value)
411 elif keytype != 'plain':
396 elif keytype != 'plain':
412 raise KeyError('unknown getbundle option type %s'
397 raise KeyError('unknown getbundle option type %s'
413 % keytype)
398 % keytype)
414 opts[key] = value
399 opts[key] = value
415 f = self._callcompressable("getbundle", **opts)
400 f = self._callcompressable("getbundle", **opts)
416 if any((cap.startswith('HG2') for cap in bundlecaps)):
401 if any((cap.startswith('HG2') for cap in bundlecaps)):
417 return bundle2.getunbundler(self.ui, f)
402 return bundle2.getunbundler(self.ui, f)
418 else:
403 else:
419 return changegroupmod.cg1unpacker(f, 'UN')
404 return changegroupmod.cg1unpacker(f, 'UN')
420
405
421 def unbundle(self, cg, heads, url):
406 def unbundle(self, cg, heads, url):
422 '''Send cg (a readable file-like object representing the
407 '''Send cg (a readable file-like object representing the
423 changegroup to push, typically a chunkbuffer object) to the
408 changegroup to push, typically a chunkbuffer object) to the
424 remote server as a bundle.
409 remote server as a bundle.
425
410
426 When pushing a bundle10 stream, return an integer indicating the
411 When pushing a bundle10 stream, return an integer indicating the
427 result of the push (see localrepository.addchangegroup()).
412 result of the push (see localrepository.addchangegroup()).
428
413
429 When pushing a bundle20 stream, return a bundle20 stream.
414 When pushing a bundle20 stream, return a bundle20 stream.
430
415
431 `url` is the url the client thinks it's pushing to, which is
416 `url` is the url the client thinks it's pushing to, which is
432 visible to hooks.
417 visible to hooks.
433 '''
418 '''
434
419
435 if heads != ['force'] and self.capable('unbundlehash'):
420 if heads != ['force'] and self.capable('unbundlehash'):
436 heads = encodelist(['hashed',
421 heads = encodelist(['hashed',
437 hashlib.sha1(''.join(sorted(heads))).digest()])
422 hashlib.sha1(''.join(sorted(heads))).digest()])
438 else:
423 else:
439 heads = encodelist(heads)
424 heads = encodelist(heads)
440
425
441 if util.safehasattr(cg, 'deltaheader'):
426 if util.safehasattr(cg, 'deltaheader'):
442 # this a bundle10, do the old style call sequence
427 # this a bundle10, do the old style call sequence
443 ret, output = self._callpush("unbundle", cg, heads=heads)
428 ret, output = self._callpush("unbundle", cg, heads=heads)
444 if ret == "":
429 if ret == "":
445 raise error.ResponseError(
430 raise error.ResponseError(
446 _('push failed:'), output)
431 _('push failed:'), output)
447 try:
432 try:
448 ret = int(ret)
433 ret = int(ret)
449 except ValueError:
434 except ValueError:
450 raise error.ResponseError(
435 raise error.ResponseError(
451 _('push failed (unexpected response):'), ret)
436 _('push failed (unexpected response):'), ret)
452
437
453 for l in output.splitlines(True):
438 for l in output.splitlines(True):
454 self.ui.status(_('remote: '), l)
439 self.ui.status(_('remote: '), l)
455 else:
440 else:
456 # bundle2 push. Send a stream, fetch a stream.
441 # bundle2 push. Send a stream, fetch a stream.
457 stream = self._calltwowaystream('unbundle', cg, heads=heads)
442 stream = self._calltwowaystream('unbundle', cg, heads=heads)
458 ret = bundle2.getunbundler(self.ui, stream)
443 ret = bundle2.getunbundler(self.ui, stream)
459 return ret
444 return ret
460
445
461 def debugwireargs(self, one, two, three=None, four=None, five=None):
446 def debugwireargs(self, one, two, three=None, four=None, five=None):
462 # don't pass optional arguments left at their default value
447 # don't pass optional arguments left at their default value
463 opts = {}
448 opts = {}
464 if three is not None:
449 if three is not None:
465 opts['three'] = three
450 opts['three'] = three
466 if four is not None:
451 if four is not None:
467 opts['four'] = four
452 opts['four'] = four
468 return self._call('debugwireargs', one=one, two=two, **opts)
453 return self._call('debugwireargs', one=one, two=two, **opts)
469
454
470 def _call(self, cmd, **args):
455 def _call(self, cmd, **args):
471 """execute <cmd> on the server
456 """execute <cmd> on the server
472
457
473 The command is expected to return a simple string.
458 The command is expected to return a simple string.
474
459
475 returns the server reply as a string."""
460 returns the server reply as a string."""
476 raise NotImplementedError()
461 raise NotImplementedError()
477
462
478 def _callstream(self, cmd, **args):
463 def _callstream(self, cmd, **args):
479 """execute <cmd> on the server
464 """execute <cmd> on the server
480
465
481 The command is expected to return a stream. Note that if the
466 The command is expected to return a stream. Note that if the
482 command doesn't return a stream, _callstream behaves
467 command doesn't return a stream, _callstream behaves
483 differently for ssh and http peers.
468 differently for ssh and http peers.
484
469
485 returns the server reply as a file like object.
470 returns the server reply as a file like object.
486 """
471 """
487 raise NotImplementedError()
472 raise NotImplementedError()
488
473
489 def _callcompressable(self, cmd, **args):
474 def _callcompressable(self, cmd, **args):
490 """execute <cmd> on the server
475 """execute <cmd> on the server
491
476
492 The command is expected to return a stream.
477 The command is expected to return a stream.
493
478
494 The stream may have been compressed in some implementations. This
479 The stream may have been compressed in some implementations. This
495 function takes care of the decompression. This is the only difference
480 function takes care of the decompression. This is the only difference
496 with _callstream.
481 with _callstream.
497
482
498 returns the server reply as a file like object.
483 returns the server reply as a file like object.
499 """
484 """
500 raise NotImplementedError()
485 raise NotImplementedError()
501
486
502 def _callpush(self, cmd, fp, **args):
487 def _callpush(self, cmd, fp, **args):
503 """execute a <cmd> on server
488 """execute a <cmd> on server
504
489
505 The command is expected to be related to a push. Push has a special
490 The command is expected to be related to a push. Push has a special
506 return method.
491 return method.
507
492
508 returns the server reply as a (ret, output) tuple. ret is either
493 returns the server reply as a (ret, output) tuple. ret is either
509 empty (error) or a stringified int.
494 empty (error) or a stringified int.
510 """
495 """
511 raise NotImplementedError()
496 raise NotImplementedError()
512
497
513 def _calltwowaystream(self, cmd, fp, **args):
498 def _calltwowaystream(self, cmd, fp, **args):
514 """execute <cmd> on server
499 """execute <cmd> on server
515
500
516 The command will send a stream to the server and get a stream in reply.
501 The command will send a stream to the server and get a stream in reply.
517 """
502 """
518 raise NotImplementedError()
503 raise NotImplementedError()
519
504
520 def _abort(self, exception):
505 def _abort(self, exception):
521 """clearly abort the wire protocol connection and raise the exception
506 """clearly abort the wire protocol connection and raise the exception
522 """
507 """
523 raise NotImplementedError()
508 raise NotImplementedError()
524
509
525 # server side
510 # server side
526
511
527 # wire protocol command can either return a string or one of these classes.
512 # wire protocol command can either return a string or one of these classes.
528 class streamres(object):
513 class streamres(object):
529 """wireproto reply: binary stream
514 """wireproto reply: binary stream
530
515
531 The call was successful and the result is a stream.
516 The call was successful and the result is a stream.
532 Iterate on the `self.gen` attribute to retrieve chunks.
517
518 Accepts either a generator or an object with a ``read(size)`` method.
519
520 ``v1compressible`` indicates whether this data can be compressed to
521 "version 1" clients (technically: HTTP peers using
522 application/mercurial-0.1 media type). This flag should NOT be used on
523 new commands because new clients should support a more modern compression
524 mechanism.
533 """
525 """
534 def __init__(self, gen):
526 def __init__(self, gen=None, reader=None, v1compressible=False):
535 self.gen = gen
527 self.gen = gen
528 self.reader = reader
529 self.v1compressible = v1compressible
536
530
537 class pushres(object):
531 class pushres(object):
538 """wireproto reply: success with simple integer return
532 """wireproto reply: success with simple integer return
539
533
540 The call was successful and returned an integer contained in `self.res`.
534 The call was successful and returned an integer contained in `self.res`.
541 """
535 """
542 def __init__(self, res):
536 def __init__(self, res):
543 self.res = res
537 self.res = res
544
538
545 class pusherr(object):
539 class pusherr(object):
546 """wireproto reply: failure
540 """wireproto reply: failure
547
541
548 The call failed. The `self.res` attribute contains the error message.
542 The call failed. The `self.res` attribute contains the error message.
549 """
543 """
550 def __init__(self, res):
544 def __init__(self, res):
551 self.res = res
545 self.res = res
552
546
553 class ooberror(object):
547 class ooberror(object):
554 """wireproto reply: failure of a batch of operation
548 """wireproto reply: failure of a batch of operation
555
549
556 Something failed during a batch call. The error message is stored in
550 Something failed during a batch call. The error message is stored in
557 `self.message`.
551 `self.message`.
558 """
552 """
559 def __init__(self, message):
553 def __init__(self, message):
560 self.message = message
554 self.message = message
561
555
562 def getdispatchrepo(repo, proto, command):
556 def getdispatchrepo(repo, proto, command):
563 """Obtain the repo used for processing wire protocol commands.
557 """Obtain the repo used for processing wire protocol commands.
564
558
565 The intent of this function is to serve as a monkeypatch point for
559 The intent of this function is to serve as a monkeypatch point for
566 extensions that need commands to operate on different repo views under
560 extensions that need commands to operate on different repo views under
567 specialized circumstances.
561 specialized circumstances.
568 """
562 """
569 return repo.filtered('served')
563 return repo.filtered('served')
570
564
571 def dispatch(repo, proto, command):
565 def dispatch(repo, proto, command):
572 repo = getdispatchrepo(repo, proto, command)
566 repo = getdispatchrepo(repo, proto, command)
573 func, spec = commands[command]
567 func, spec = commands[command]
574 args = proto.getargs(spec)
568 args = proto.getargs(spec)
575 return func(repo, proto, *args)
569 return func(repo, proto, *args)
576
570
577 def options(cmd, keys, others):
571 def options(cmd, keys, others):
578 opts = {}
572 opts = {}
579 for k in keys:
573 for k in keys:
580 if k in others:
574 if k in others:
581 opts[k] = others[k]
575 opts[k] = others[k]
582 del others[k]
576 del others[k]
583 if others:
577 if others:
584 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
578 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
585 % (cmd, ",".join(others)))
579 % (cmd, ",".join(others)))
586 return opts
580 return opts
587
581
588 def bundle1allowed(repo, action):
582 def bundle1allowed(repo, action):
589 """Whether a bundle1 operation is allowed from the server.
583 """Whether a bundle1 operation is allowed from the server.
590
584
591 Priority is:
585 Priority is:
592
586
593 1. server.bundle1gd.<action> (if generaldelta active)
587 1. server.bundle1gd.<action> (if generaldelta active)
594 2. server.bundle1.<action>
588 2. server.bundle1.<action>
595 3. server.bundle1gd (if generaldelta active)
589 3. server.bundle1gd (if generaldelta active)
596 4. server.bundle1
590 4. server.bundle1
597 """
591 """
598 ui = repo.ui
592 ui = repo.ui
599 gd = 'generaldelta' in repo.requirements
593 gd = 'generaldelta' in repo.requirements
600
594
601 if gd:
595 if gd:
602 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
596 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
603 if v is not None:
597 if v is not None:
604 return v
598 return v
605
599
606 v = ui.configbool('server', 'bundle1.%s' % action, None)
600 v = ui.configbool('server', 'bundle1.%s' % action, None)
607 if v is not None:
601 if v is not None:
608 return v
602 return v
609
603
610 if gd:
604 if gd:
611 v = ui.configbool('server', 'bundle1gd', None)
605 v = ui.configbool('server', 'bundle1gd', None)
612 if v is not None:
606 if v is not None:
613 return v
607 return v
614
608
615 return ui.configbool('server', 'bundle1', True)
609 return ui.configbool('server', 'bundle1', True)
616
610
617 # list of commands
611 # list of commands
618 commands = {}
612 commands = {}
619
613
620 def wireprotocommand(name, args=''):
614 def wireprotocommand(name, args=''):
621 """decorator for wire protocol command"""
615 """decorator for wire protocol command"""
622 def register(func):
616 def register(func):
623 commands[name] = (func, args)
617 commands[name] = (func, args)
624 return func
618 return func
625 return register
619 return register
626
620
627 @wireprotocommand('batch', 'cmds *')
621 @wireprotocommand('batch', 'cmds *')
628 def batch(repo, proto, cmds, others):
622 def batch(repo, proto, cmds, others):
629 repo = repo.filtered("served")
623 repo = repo.filtered("served")
630 res = []
624 res = []
631 for pair in cmds.split(';'):
625 for pair in cmds.split(';'):
632 op, args = pair.split(' ', 1)
626 op, args = pair.split(' ', 1)
633 vals = {}
627 vals = {}
634 for a in args.split(','):
628 for a in args.split(','):
635 if a:
629 if a:
636 n, v = a.split('=')
630 n, v = a.split('=')
637 vals[unescapearg(n)] = unescapearg(v)
631 vals[unescapearg(n)] = unescapearg(v)
638 func, spec = commands[op]
632 func, spec = commands[op]
639 if spec:
633 if spec:
640 keys = spec.split()
634 keys = spec.split()
641 data = {}
635 data = {}
642 for k in keys:
636 for k in keys:
643 if k == '*':
637 if k == '*':
644 star = {}
638 star = {}
645 for key in vals.keys():
639 for key in vals.keys():
646 if key not in keys:
640 if key not in keys:
647 star[key] = vals[key]
641 star[key] = vals[key]
648 data['*'] = star
642 data['*'] = star
649 else:
643 else:
650 data[k] = vals[k]
644 data[k] = vals[k]
651 result = func(repo, proto, *[data[k] for k in keys])
645 result = func(repo, proto, *[data[k] for k in keys])
652 else:
646 else:
653 result = func(repo, proto)
647 result = func(repo, proto)
654 if isinstance(result, ooberror):
648 if isinstance(result, ooberror):
655 return result
649 return result
656 res.append(escapearg(result))
650 res.append(escapearg(result))
657 return ';'.join(res)
651 return ';'.join(res)
658
652
659 @wireprotocommand('between', 'pairs')
653 @wireprotocommand('between', 'pairs')
660 def between(repo, proto, pairs):
654 def between(repo, proto, pairs):
661 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
655 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
662 r = []
656 r = []
663 for b in repo.between(pairs):
657 for b in repo.between(pairs):
664 r.append(encodelist(b) + "\n")
658 r.append(encodelist(b) + "\n")
665 return "".join(r)
659 return "".join(r)
666
660
667 @wireprotocommand('branchmap')
661 @wireprotocommand('branchmap')
668 def branchmap(repo, proto):
662 def branchmap(repo, proto):
669 branchmap = repo.branchmap()
663 branchmap = repo.branchmap()
670 heads = []
664 heads = []
671 for branch, nodes in branchmap.iteritems():
665 for branch, nodes in branchmap.iteritems():
672 branchname = urlreq.quote(encoding.fromlocal(branch))
666 branchname = urlreq.quote(encoding.fromlocal(branch))
673 branchnodes = encodelist(nodes)
667 branchnodes = encodelist(nodes)
674 heads.append('%s %s' % (branchname, branchnodes))
668 heads.append('%s %s' % (branchname, branchnodes))
675 return '\n'.join(heads)
669 return '\n'.join(heads)
676
670
677 @wireprotocommand('branches', 'nodes')
671 @wireprotocommand('branches', 'nodes')
678 def branches(repo, proto, nodes):
672 def branches(repo, proto, nodes):
679 nodes = decodelist(nodes)
673 nodes = decodelist(nodes)
680 r = []
674 r = []
681 for b in repo.branches(nodes):
675 for b in repo.branches(nodes):
682 r.append(encodelist(b) + "\n")
676 r.append(encodelist(b) + "\n")
683 return "".join(r)
677 return "".join(r)
684
678
685 @wireprotocommand('clonebundles', '')
679 @wireprotocommand('clonebundles', '')
686 def clonebundles(repo, proto):
680 def clonebundles(repo, proto):
687 """Server command for returning info for available bundles to seed clones.
681 """Server command for returning info for available bundles to seed clones.
688
682
689 Clients will parse this response and determine what bundle to fetch.
683 Clients will parse this response and determine what bundle to fetch.
690
684
691 Extensions may wrap this command to filter or dynamically emit data
685 Extensions may wrap this command to filter or dynamically emit data
692 depending on the request. e.g. you could advertise URLs for the closest
686 depending on the request. e.g. you could advertise URLs for the closest
693 data center given the client's IP address.
687 data center given the client's IP address.
694 """
688 """
695 return repo.opener.tryread('clonebundles.manifest')
689 return repo.opener.tryread('clonebundles.manifest')
696
690
697 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
691 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
698 'known', 'getbundle', 'unbundlehash', 'batch']
692 'known', 'getbundle', 'unbundlehash', 'batch']
699
693
700 def _capabilities(repo, proto):
694 def _capabilities(repo, proto):
701 """return a list of capabilities for a repo
695 """return a list of capabilities for a repo
702
696
703 This function exists to allow extensions to easily wrap capabilities
697 This function exists to allow extensions to easily wrap capabilities
704 computation
698 computation
705
699
706 - returns a lists: easy to alter
700 - returns a lists: easy to alter
707 - change done here will be propagated to both `capabilities` and `hello`
701 - change done here will be propagated to both `capabilities` and `hello`
708 command without any other action needed.
702 command without any other action needed.
709 """
703 """
710 # copy to prevent modification of the global list
704 # copy to prevent modification of the global list
711 caps = list(wireprotocaps)
705 caps = list(wireprotocaps)
712 if streamclone.allowservergeneration(repo.ui):
706 if streamclone.allowservergeneration(repo.ui):
713 if repo.ui.configbool('server', 'preferuncompressed', False):
707 if repo.ui.configbool('server', 'preferuncompressed', False):
714 caps.append('stream-preferred')
708 caps.append('stream-preferred')
715 requiredformats = repo.requirements & repo.supportedformats
709 requiredformats = repo.requirements & repo.supportedformats
716 # if our local revlogs are just revlogv1, add 'stream' cap
710 # if our local revlogs are just revlogv1, add 'stream' cap
717 if not requiredformats - set(('revlogv1',)):
711 if not requiredformats - set(('revlogv1',)):
718 caps.append('stream')
712 caps.append('stream')
719 # otherwise, add 'streamreqs' detailing our local revlog format
713 # otherwise, add 'streamreqs' detailing our local revlog format
720 else:
714 else:
721 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
715 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
722 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
716 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
723 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
717 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
724 caps.append('bundle2=' + urlreq.quote(capsblob))
718 caps.append('bundle2=' + urlreq.quote(capsblob))
725 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
719 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
726 caps.append(
720 caps.append(
727 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
721 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
728 if repo.ui.configbool('experimental', 'httppostargs', False):
722 if repo.ui.configbool('experimental', 'httppostargs', False):
729 caps.append('httppostargs')
723 caps.append('httppostargs')
730 return caps
724 return caps
731
725
732 # If you are writing an extension and consider wrapping this function. Wrap
726 # If you are writing an extension and consider wrapping this function. Wrap
733 # `_capabilities` instead.
727 # `_capabilities` instead.
734 @wireprotocommand('capabilities')
728 @wireprotocommand('capabilities')
735 def capabilities(repo, proto):
729 def capabilities(repo, proto):
736 return ' '.join(_capabilities(repo, proto))
730 return ' '.join(_capabilities(repo, proto))
737
731
738 @wireprotocommand('changegroup', 'roots')
732 @wireprotocommand('changegroup', 'roots')
739 def changegroup(repo, proto, roots):
733 def changegroup(repo, proto, roots):
740 nodes = decodelist(roots)
734 nodes = decodelist(roots)
741 cg = changegroupmod.changegroup(repo, nodes, 'serve')
735 cg = changegroupmod.changegroup(repo, nodes, 'serve')
742 return streamres(proto.groupchunks(cg))
736 return streamres(reader=cg, v1compressible=True)
743
737
744 @wireprotocommand('changegroupsubset', 'bases heads')
738 @wireprotocommand('changegroupsubset', 'bases heads')
745 def changegroupsubset(repo, proto, bases, heads):
739 def changegroupsubset(repo, proto, bases, heads):
746 bases = decodelist(bases)
740 bases = decodelist(bases)
747 heads = decodelist(heads)
741 heads = decodelist(heads)
748 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
742 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
749 return streamres(proto.groupchunks(cg))
743 return streamres(reader=cg, v1compressible=True)
750
744
751 @wireprotocommand('debugwireargs', 'one two *')
745 @wireprotocommand('debugwireargs', 'one two *')
752 def debugwireargs(repo, proto, one, two, others):
746 def debugwireargs(repo, proto, one, two, others):
753 # only accept optional args from the known set
747 # only accept optional args from the known set
754 opts = options('debugwireargs', ['three', 'four'], others)
748 opts = options('debugwireargs', ['three', 'four'], others)
755 return repo.debugwireargs(one, two, **opts)
749 return repo.debugwireargs(one, two, **opts)
756
750
757 @wireprotocommand('getbundle', '*')
751 @wireprotocommand('getbundle', '*')
758 def getbundle(repo, proto, others):
752 def getbundle(repo, proto, others):
759 opts = options('getbundle', gboptsmap.keys(), others)
753 opts = options('getbundle', gboptsmap.keys(), others)
760 for k, v in opts.iteritems():
754 for k, v in opts.iteritems():
761 keytype = gboptsmap[k]
755 keytype = gboptsmap[k]
762 if keytype == 'nodes':
756 if keytype == 'nodes':
763 opts[k] = decodelist(v)
757 opts[k] = decodelist(v)
764 elif keytype == 'csv':
758 elif keytype == 'csv':
765 opts[k] = list(v.split(','))
759 opts[k] = list(v.split(','))
766 elif keytype == 'scsv':
760 elif keytype == 'scsv':
767 opts[k] = set(v.split(','))
761 opts[k] = set(v.split(','))
768 elif keytype == 'boolean':
762 elif keytype == 'boolean':
769 # Client should serialize False as '0', which is a non-empty string
763 # Client should serialize False as '0', which is a non-empty string
770 # so it evaluates as a True bool.
764 # so it evaluates as a True bool.
771 if v == '0':
765 if v == '0':
772 opts[k] = False
766 opts[k] = False
773 else:
767 else:
774 opts[k] = bool(v)
768 opts[k] = bool(v)
775 elif keytype != 'plain':
769 elif keytype != 'plain':
776 raise KeyError('unknown getbundle option type %s'
770 raise KeyError('unknown getbundle option type %s'
777 % keytype)
771 % keytype)
778
772
779 if not bundle1allowed(repo, 'pull'):
773 if not bundle1allowed(repo, 'pull'):
780 if not exchange.bundle2requested(opts.get('bundlecaps')):
774 if not exchange.bundle2requested(opts.get('bundlecaps')):
781 return ooberror(bundle2required)
775 return ooberror(bundle2required)
782
776
783 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
777 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
784 return streamres(proto.compresschunks(chunks))
778 return streamres(gen=chunks, v1compressible=True)
785
779
786 @wireprotocommand('heads')
780 @wireprotocommand('heads')
787 def heads(repo, proto):
781 def heads(repo, proto):
788 h = repo.heads()
782 h = repo.heads()
789 return encodelist(h) + "\n"
783 return encodelist(h) + "\n"
790
784
791 @wireprotocommand('hello')
785 @wireprotocommand('hello')
792 def hello(repo, proto):
786 def hello(repo, proto):
793 '''the hello command returns a set of lines describing various
787 '''the hello command returns a set of lines describing various
794 interesting things about the server, in an RFC822-like format.
788 interesting things about the server, in an RFC822-like format.
795 Currently the only one defined is "capabilities", which
789 Currently the only one defined is "capabilities", which
796 consists of a line in the form:
790 consists of a line in the form:
797
791
798 capabilities: space separated list of tokens
792 capabilities: space separated list of tokens
799 '''
793 '''
800 return "capabilities: %s\n" % (capabilities(repo, proto))
794 return "capabilities: %s\n" % (capabilities(repo, proto))
801
795
802 @wireprotocommand('listkeys', 'namespace')
796 @wireprotocommand('listkeys', 'namespace')
803 def listkeys(repo, proto, namespace):
797 def listkeys(repo, proto, namespace):
804 d = repo.listkeys(encoding.tolocal(namespace)).items()
798 d = repo.listkeys(encoding.tolocal(namespace)).items()
805 return pushkeymod.encodekeys(d)
799 return pushkeymod.encodekeys(d)
806
800
807 @wireprotocommand('lookup', 'key')
801 @wireprotocommand('lookup', 'key')
808 def lookup(repo, proto, key):
802 def lookup(repo, proto, key):
809 try:
803 try:
810 k = encoding.tolocal(key)
804 k = encoding.tolocal(key)
811 c = repo[k]
805 c = repo[k]
812 r = c.hex()
806 r = c.hex()
813 success = 1
807 success = 1
814 except Exception as inst:
808 except Exception as inst:
815 r = str(inst)
809 r = str(inst)
816 success = 0
810 success = 0
817 return "%s %s\n" % (success, r)
811 return "%s %s\n" % (success, r)
818
812
819 @wireprotocommand('known', 'nodes *')
813 @wireprotocommand('known', 'nodes *')
820 def known(repo, proto, nodes, others):
814 def known(repo, proto, nodes, others):
821 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
815 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
822
816
823 @wireprotocommand('pushkey', 'namespace key old new')
817 @wireprotocommand('pushkey', 'namespace key old new')
824 def pushkey(repo, proto, namespace, key, old, new):
818 def pushkey(repo, proto, namespace, key, old, new):
825 # compatibility with pre-1.8 clients which were accidentally
819 # compatibility with pre-1.8 clients which were accidentally
826 # sending raw binary nodes rather than utf-8-encoded hex
820 # sending raw binary nodes rather than utf-8-encoded hex
827 if len(new) == 20 and new.encode('string-escape') != new:
821 if len(new) == 20 and new.encode('string-escape') != new:
828 # looks like it could be a binary node
822 # looks like it could be a binary node
829 try:
823 try:
830 new.decode('utf-8')
824 new.decode('utf-8')
831 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
825 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
832 except UnicodeDecodeError:
826 except UnicodeDecodeError:
833 pass # binary, leave unmodified
827 pass # binary, leave unmodified
834 else:
828 else:
835 new = encoding.tolocal(new) # normal path
829 new = encoding.tolocal(new) # normal path
836
830
837 if util.safehasattr(proto, 'restore'):
831 if util.safehasattr(proto, 'restore'):
838
832
839 proto.redirect()
833 proto.redirect()
840
834
841 try:
835 try:
842 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
836 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
843 encoding.tolocal(old), new) or False
837 encoding.tolocal(old), new) or False
844 except error.Abort:
838 except error.Abort:
845 r = False
839 r = False
846
840
847 output = proto.restore()
841 output = proto.restore()
848
842
849 return '%s\n%s' % (int(r), output)
843 return '%s\n%s' % (int(r), output)
850
844
851 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
845 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
852 encoding.tolocal(old), new)
846 encoding.tolocal(old), new)
853 return '%s\n' % int(r)
847 return '%s\n' % int(r)
854
848
855 @wireprotocommand('stream_out')
849 @wireprotocommand('stream_out')
856 def stream(repo, proto):
850 def stream(repo, proto):
857 '''If the server supports streaming clone, it advertises the "stream"
851 '''If the server supports streaming clone, it advertises the "stream"
858 capability with a value representing the version and flags of the repo
852 capability with a value representing the version and flags of the repo
859 it is serving. Client checks to see if it understands the format.
853 it is serving. Client checks to see if it understands the format.
860 '''
854 '''
861 if not streamclone.allowservergeneration(repo.ui):
855 if not streamclone.allowservergeneration(repo.ui):
862 return '1\n'
856 return '1\n'
863
857
864 def getstream(it):
858 def getstream(it):
865 yield '0\n'
859 yield '0\n'
866 for chunk in it:
860 for chunk in it:
867 yield chunk
861 yield chunk
868
862
869 try:
863 try:
870 # LockError may be raised before the first result is yielded. Don't
864 # LockError may be raised before the first result is yielded. Don't
871 # emit output until we're sure we got the lock successfully.
865 # emit output until we're sure we got the lock successfully.
872 it = streamclone.generatev1wireproto(repo)
866 it = streamclone.generatev1wireproto(repo)
873 return streamres(getstream(it))
867 return streamres(gen=getstream(it))
874 except error.LockError:
868 except error.LockError:
875 return '2\n'
869 return '2\n'
876
870
877 @wireprotocommand('unbundle', 'heads')
871 @wireprotocommand('unbundle', 'heads')
878 def unbundle(repo, proto, heads):
872 def unbundle(repo, proto, heads):
879 their_heads = decodelist(heads)
873 their_heads = decodelist(heads)
880
874
881 try:
875 try:
882 proto.redirect()
876 proto.redirect()
883
877
884 exchange.check_heads(repo, their_heads, 'preparing changes')
878 exchange.check_heads(repo, their_heads, 'preparing changes')
885
879
886 # write bundle data to temporary file because it can be big
880 # write bundle data to temporary file because it can be big
887 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
881 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
888 fp = os.fdopen(fd, 'wb+')
882 fp = os.fdopen(fd, 'wb+')
889 r = 0
883 r = 0
890 try:
884 try:
891 proto.getfile(fp)
885 proto.getfile(fp)
892 fp.seek(0)
886 fp.seek(0)
893 gen = exchange.readbundle(repo.ui, fp, None)
887 gen = exchange.readbundle(repo.ui, fp, None)
894 if (isinstance(gen, changegroupmod.cg1unpacker)
888 if (isinstance(gen, changegroupmod.cg1unpacker)
895 and not bundle1allowed(repo, 'push')):
889 and not bundle1allowed(repo, 'push')):
896 return ooberror(bundle2required)
890 return ooberror(bundle2required)
897
891
898 r = exchange.unbundle(repo, gen, their_heads, 'serve',
892 r = exchange.unbundle(repo, gen, their_heads, 'serve',
899 proto._client())
893 proto._client())
900 if util.safehasattr(r, 'addpart'):
894 if util.safehasattr(r, 'addpart'):
901 # The return looks streamable, we are in the bundle2 case and
895 # The return looks streamable, we are in the bundle2 case and
902 # should return a stream.
896 # should return a stream.
903 return streamres(r.getchunks())
897 return streamres(gen=r.getchunks())
904 return pushres(r)
898 return pushres(r)
905
899
906 finally:
900 finally:
907 fp.close()
901 fp.close()
908 os.unlink(tempname)
902 os.unlink(tempname)
909
903
910 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
904 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
911 # handle non-bundle2 case first
905 # handle non-bundle2 case first
912 if not getattr(exc, 'duringunbundle2', False):
906 if not getattr(exc, 'duringunbundle2', False):
913 try:
907 try:
914 raise
908 raise
915 except error.Abort:
909 except error.Abort:
916 # The old code we moved used sys.stderr directly.
910 # The old code we moved used sys.stderr directly.
917 # We did not change it to minimise code change.
911 # We did not change it to minimise code change.
918 # This need to be moved to something proper.
912 # This need to be moved to something proper.
919 # Feel free to do it.
913 # Feel free to do it.
920 sys.stderr.write("abort: %s\n" % exc)
914 sys.stderr.write("abort: %s\n" % exc)
921 return pushres(0)
915 return pushres(0)
922 except error.PushRaced:
916 except error.PushRaced:
923 return pusherr(str(exc))
917 return pusherr(str(exc))
924
918
925 bundler = bundle2.bundle20(repo.ui)
919 bundler = bundle2.bundle20(repo.ui)
926 for out in getattr(exc, '_bundle2salvagedoutput', ()):
920 for out in getattr(exc, '_bundle2salvagedoutput', ()):
927 bundler.addpart(out)
921 bundler.addpart(out)
928 try:
922 try:
929 try:
923 try:
930 raise
924 raise
931 except error.PushkeyFailed as exc:
925 except error.PushkeyFailed as exc:
932 # check client caps
926 # check client caps
933 remotecaps = getattr(exc, '_replycaps', None)
927 remotecaps = getattr(exc, '_replycaps', None)
934 if (remotecaps is not None
928 if (remotecaps is not None
935 and 'pushkey' not in remotecaps.get('error', ())):
929 and 'pushkey' not in remotecaps.get('error', ())):
936 # no support remote side, fallback to Abort handler.
930 # no support remote side, fallback to Abort handler.
937 raise
931 raise
938 part = bundler.newpart('error:pushkey')
932 part = bundler.newpart('error:pushkey')
939 part.addparam('in-reply-to', exc.partid)
933 part.addparam('in-reply-to', exc.partid)
940 if exc.namespace is not None:
934 if exc.namespace is not None:
941 part.addparam('namespace', exc.namespace, mandatory=False)
935 part.addparam('namespace', exc.namespace, mandatory=False)
942 if exc.key is not None:
936 if exc.key is not None:
943 part.addparam('key', exc.key, mandatory=False)
937 part.addparam('key', exc.key, mandatory=False)
944 if exc.new is not None:
938 if exc.new is not None:
945 part.addparam('new', exc.new, mandatory=False)
939 part.addparam('new', exc.new, mandatory=False)
946 if exc.old is not None:
940 if exc.old is not None:
947 part.addparam('old', exc.old, mandatory=False)
941 part.addparam('old', exc.old, mandatory=False)
948 if exc.ret is not None:
942 if exc.ret is not None:
949 part.addparam('ret', exc.ret, mandatory=False)
943 part.addparam('ret', exc.ret, mandatory=False)
950 except error.BundleValueError as exc:
944 except error.BundleValueError as exc:
951 errpart = bundler.newpart('error:unsupportedcontent')
945 errpart = bundler.newpart('error:unsupportedcontent')
952 if exc.parttype is not None:
946 if exc.parttype is not None:
953 errpart.addparam('parttype', exc.parttype)
947 errpart.addparam('parttype', exc.parttype)
954 if exc.params:
948 if exc.params:
955 errpart.addparam('params', '\0'.join(exc.params))
949 errpart.addparam('params', '\0'.join(exc.params))
956 except error.Abort as exc:
950 except error.Abort as exc:
957 manargs = [('message', str(exc))]
951 manargs = [('message', str(exc))]
958 advargs = []
952 advargs = []
959 if exc.hint is not None:
953 if exc.hint is not None:
960 advargs.append(('hint', exc.hint))
954 advargs.append(('hint', exc.hint))
961 bundler.addpart(bundle2.bundlepart('error:abort',
955 bundler.addpart(bundle2.bundlepart('error:abort',
962 manargs, advargs))
956 manargs, advargs))
963 except error.PushRaced as exc:
957 except error.PushRaced as exc:
964 bundler.newpart('error:pushraced', [('message', str(exc))])
958 bundler.newpart('error:pushraced', [('message', str(exc))])
965 return streamres(bundler.getchunks())
959 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now