##// END OF EJS Templates
wireproto: drop support for reader interface from streamres (API)...
Gregory Szorc -
r35723:8cdb671d default
parent child Browse files
Show More
@@ -1,210 +1,207
1 #
1 #
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import cgi
10 import cgi
11 import struct
11 import struct
12
12
13 from .common import (
13 from .common import (
14 HTTP_OK,
14 HTTP_OK,
15 )
15 )
16
16
17 from .. import (
17 from .. import (
18 error,
18 error,
19 pycompat,
19 pycompat,
20 util,
20 util,
21 wireproto,
21 wireproto,
22 )
22 )
23 stringio = util.stringio
23 stringio = util.stringio
24
24
25 urlerr = util.urlerr
25 urlerr = util.urlerr
26 urlreq = util.urlreq
26 urlreq = util.urlreq
27
27
28 HGTYPE = 'application/mercurial-0.1'
28 HGTYPE = 'application/mercurial-0.1'
29 HGTYPE2 = 'application/mercurial-0.2'
29 HGTYPE2 = 'application/mercurial-0.2'
30 HGERRTYPE = 'application/hg-error'
30 HGERRTYPE = 'application/hg-error'
31
31
32 def decodevaluefromheaders(req, headerprefix):
32 def decodevaluefromheaders(req, headerprefix):
33 """Decode a long value from multiple HTTP request headers.
33 """Decode a long value from multiple HTTP request headers.
34
34
35 Returns the value as a bytes, not a str.
35 Returns the value as a bytes, not a str.
36 """
36 """
37 chunks = []
37 chunks = []
38 i = 1
38 i = 1
39 prefix = headerprefix.upper().replace(r'-', r'_')
39 prefix = headerprefix.upper().replace(r'-', r'_')
40 while True:
40 while True:
41 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
41 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
42 if v is None:
42 if v is None:
43 break
43 break
44 chunks.append(pycompat.bytesurl(v))
44 chunks.append(pycompat.bytesurl(v))
45 i += 1
45 i += 1
46
46
47 return ''.join(chunks)
47 return ''.join(chunks)
48
48
49 class webproto(wireproto.abstractserverproto):
49 class webproto(wireproto.abstractserverproto):
50 def __init__(self, req, ui):
50 def __init__(self, req, ui):
51 self.req = req
51 self.req = req
52 self.response = ''
52 self.response = ''
53 self.ui = ui
53 self.ui = ui
54 self.name = 'http'
54 self.name = 'http'
55
55
56 def getargs(self, args):
56 def getargs(self, args):
57 knownargs = self._args()
57 knownargs = self._args()
58 data = {}
58 data = {}
59 keys = args.split()
59 keys = args.split()
60 for k in keys:
60 for k in keys:
61 if k == '*':
61 if k == '*':
62 star = {}
62 star = {}
63 for key in knownargs.keys():
63 for key in knownargs.keys():
64 if key != 'cmd' and key not in keys:
64 if key != 'cmd' and key not in keys:
65 star[key] = knownargs[key][0]
65 star[key] = knownargs[key][0]
66 data['*'] = star
66 data['*'] = star
67 else:
67 else:
68 data[k] = knownargs[k][0]
68 data[k] = knownargs[k][0]
69 return [data[k] for k in keys]
69 return [data[k] for k in keys]
70 def _args(self):
70 def _args(self):
71 args = self.req.form.copy()
71 args = self.req.form.copy()
72 if pycompat.ispy3:
72 if pycompat.ispy3:
73 args = {k.encode('ascii'): [v.encode('ascii') for v in vs]
73 args = {k.encode('ascii'): [v.encode('ascii') for v in vs]
74 for k, vs in args.items()}
74 for k, vs in args.items()}
75 postlen = int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
75 postlen = int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
76 if postlen:
76 if postlen:
77 args.update(cgi.parse_qs(
77 args.update(cgi.parse_qs(
78 self.req.read(postlen), keep_blank_values=True))
78 self.req.read(postlen), keep_blank_values=True))
79 return args
79 return args
80
80
81 argvalue = decodevaluefromheaders(self.req, r'X-HgArg')
81 argvalue = decodevaluefromheaders(self.req, r'X-HgArg')
82 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
82 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
83 return args
83 return args
84 def getfile(self, fp):
84 def getfile(self, fp):
85 length = int(self.req.env[r'CONTENT_LENGTH'])
85 length = int(self.req.env[r'CONTENT_LENGTH'])
86 # If httppostargs is used, we need to read Content-Length
86 # If httppostargs is used, we need to read Content-Length
87 # minus the amount that was consumed by args.
87 # minus the amount that was consumed by args.
88 length -= int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
88 length -= int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
89 for s in util.filechunkiter(self.req, limit=length):
89 for s in util.filechunkiter(self.req, limit=length):
90 fp.write(s)
90 fp.write(s)
91 def redirect(self):
91 def redirect(self):
92 self.oldio = self.ui.fout, self.ui.ferr
92 self.oldio = self.ui.fout, self.ui.ferr
93 self.ui.ferr = self.ui.fout = stringio()
93 self.ui.ferr = self.ui.fout = stringio()
94 def restore(self):
94 def restore(self):
95 val = self.ui.fout.getvalue()
95 val = self.ui.fout.getvalue()
96 self.ui.ferr, self.ui.fout = self.oldio
96 self.ui.ferr, self.ui.fout = self.oldio
97 return val
97 return val
98
98
99 def _client(self):
99 def _client(self):
100 return 'remote:%s:%s:%s' % (
100 return 'remote:%s:%s:%s' % (
101 self.req.env.get('wsgi.url_scheme') or 'http',
101 self.req.env.get('wsgi.url_scheme') or 'http',
102 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
102 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
103 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
103 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
104
104
105 def responsetype(self, v1compressible=False):
105 def responsetype(self, v1compressible=False):
106 """Determine the appropriate response type and compression settings.
106 """Determine the appropriate response type and compression settings.
107
107
108 The ``v1compressible`` argument states whether the response with
108 The ``v1compressible`` argument states whether the response with
109 application/mercurial-0.1 media types should be zlib compressed.
109 application/mercurial-0.1 media types should be zlib compressed.
110
110
111 Returns a tuple of (mediatype, compengine, engineopts).
111 Returns a tuple of (mediatype, compengine, engineopts).
112 """
112 """
113 # For now, if it isn't compressible in the old world, it's never
113 # For now, if it isn't compressible in the old world, it's never
114 # compressible. We can change this to send uncompressed 0.2 payloads
114 # compressible. We can change this to send uncompressed 0.2 payloads
115 # later.
115 # later.
116 if not v1compressible:
116 if not v1compressible:
117 return HGTYPE, None, None
117 return HGTYPE, None, None
118
118
119 # Determine the response media type and compression engine based
119 # Determine the response media type and compression engine based
120 # on the request parameters.
120 # on the request parameters.
121 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
121 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
122
122
123 if '0.2' in protocaps:
123 if '0.2' in protocaps:
124 # Default as defined by wire protocol spec.
124 # Default as defined by wire protocol spec.
125 compformats = ['zlib', 'none']
125 compformats = ['zlib', 'none']
126 for cap in protocaps:
126 for cap in protocaps:
127 if cap.startswith('comp='):
127 if cap.startswith('comp='):
128 compformats = cap[5:].split(',')
128 compformats = cap[5:].split(',')
129 break
129 break
130
130
131 # Now find an agreed upon compression format.
131 # Now find an agreed upon compression format.
132 for engine in wireproto.supportedcompengines(self.ui, self,
132 for engine in wireproto.supportedcompengines(self.ui, self,
133 util.SERVERROLE):
133 util.SERVERROLE):
134 if engine.wireprotosupport().name in compformats:
134 if engine.wireprotosupport().name in compformats:
135 opts = {}
135 opts = {}
136 level = self.ui.configint('server',
136 level = self.ui.configint('server',
137 '%slevel' % engine.name())
137 '%slevel' % engine.name())
138 if level is not None:
138 if level is not None:
139 opts['level'] = level
139 opts['level'] = level
140
140
141 return HGTYPE2, engine, opts
141 return HGTYPE2, engine, opts
142
142
143 # No mutually supported compression format. Fall back to the
143 # No mutually supported compression format. Fall back to the
144 # legacy protocol.
144 # legacy protocol.
145
145
146 # Don't allow untrusted settings because disabling compression or
146 # Don't allow untrusted settings because disabling compression or
147 # setting a very high compression level could lead to flooding
147 # setting a very high compression level could lead to flooding
148 # the server's network or CPU.
148 # the server's network or CPU.
149 opts = {'level': self.ui.configint('server', 'zliblevel')}
149 opts = {'level': self.ui.configint('server', 'zliblevel')}
150 return HGTYPE, util.compengines['zlib'], opts
150 return HGTYPE, util.compengines['zlib'], opts
151
151
152 def iscmd(cmd):
152 def iscmd(cmd):
153 return cmd in wireproto.commands
153 return cmd in wireproto.commands
154
154
155 def call(repo, req, cmd):
155 def call(repo, req, cmd):
156 p = webproto(req, repo.ui)
156 p = webproto(req, repo.ui)
157
157
158 def genversion2(gen, compress, engine, engineopts):
158 def genversion2(gen, compress, engine, engineopts):
159 # application/mercurial-0.2 always sends a payload header
159 # application/mercurial-0.2 always sends a payload header
160 # identifying the compression engine.
160 # identifying the compression engine.
161 name = engine.wireprotosupport().name
161 name = engine.wireprotosupport().name
162 assert 0 < len(name) < 256
162 assert 0 < len(name) < 256
163 yield struct.pack('B', len(name))
163 yield struct.pack('B', len(name))
164 yield name
164 yield name
165
165
166 if compress:
166 if compress:
167 for chunk in engine.compressstream(gen, opts=engineopts):
167 for chunk in engine.compressstream(gen, opts=engineopts):
168 yield chunk
168 yield chunk
169 else:
169 else:
170 for chunk in gen:
170 for chunk in gen:
171 yield chunk
171 yield chunk
172
172
173 rsp = wireproto.dispatch(repo, p, cmd)
173 rsp = wireproto.dispatch(repo, p, cmd)
174 if isinstance(rsp, bytes):
174 if isinstance(rsp, bytes):
175 req.respond(HTTP_OK, HGTYPE, body=rsp)
175 req.respond(HTTP_OK, HGTYPE, body=rsp)
176 return []
176 return []
177 elif isinstance(rsp, wireproto.streamres):
177 elif isinstance(rsp, wireproto.streamres):
178 if rsp.reader:
179 gen = iter(lambda: rsp.reader.read(32768), '')
180 else:
181 gen = rsp.gen
178 gen = rsp.gen
182
179
183 # This code for compression should not be streamres specific. It
180 # This code for compression should not be streamres specific. It
184 # is here because we only compress streamres at the moment.
181 # is here because we only compress streamres at the moment.
185 mediatype, engine, engineopts = p.responsetype(rsp.v1compressible)
182 mediatype, engine, engineopts = p.responsetype(rsp.v1compressible)
186
183
187 if mediatype == HGTYPE and rsp.v1compressible:
184 if mediatype == HGTYPE and rsp.v1compressible:
188 gen = engine.compressstream(gen, engineopts)
185 gen = engine.compressstream(gen, engineopts)
189 elif mediatype == HGTYPE2:
186 elif mediatype == HGTYPE2:
190 gen = genversion2(gen, rsp.v1compressible, engine, engineopts)
187 gen = genversion2(gen, rsp.v1compressible, engine, engineopts)
191
188
192 req.respond(HTTP_OK, mediatype)
189 req.respond(HTTP_OK, mediatype)
193 return gen
190 return gen
194 elif isinstance(rsp, wireproto.pushres):
191 elif isinstance(rsp, wireproto.pushres):
195 val = p.restore()
192 val = p.restore()
196 rsp = '%d\n%s' % (rsp.res, val)
193 rsp = '%d\n%s' % (rsp.res, val)
197 req.respond(HTTP_OK, HGTYPE, body=rsp)
194 req.respond(HTTP_OK, HGTYPE, body=rsp)
198 return []
195 return []
199 elif isinstance(rsp, wireproto.pusherr):
196 elif isinstance(rsp, wireproto.pusherr):
200 # drain the incoming bundle
197 # drain the incoming bundle
201 req.drain()
198 req.drain()
202 p.restore()
199 p.restore()
203 rsp = '0\n%s\n' % rsp.res
200 rsp = '0\n%s\n' % rsp.res
204 req.respond(HTTP_OK, HGTYPE, body=rsp)
201 req.respond(HTTP_OK, HGTYPE, body=rsp)
205 return []
202 return []
206 elif isinstance(rsp, wireproto.ooberror):
203 elif isinstance(rsp, wireproto.ooberror):
207 rsp = rsp.message
204 rsp = rsp.message
208 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
205 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
209 return []
206 return []
210 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
207 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
@@ -1,136 +1,130
1 # sshserver.py - ssh protocol server support for mercurial
1 # sshserver.py - ssh protocol server support for mercurial
2 #
2 #
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 #
5 #
6 # This software may be used and distributed according to the terms of the
6 # This software may be used and distributed according to the terms of the
7 # GNU General Public License version 2 or any later version.
7 # GNU General Public License version 2 or any later version.
8
8
9 from __future__ import absolute_import
9 from __future__ import absolute_import
10
10
11 import sys
11 import sys
12
12
13 from .i18n import _
13 from .i18n import _
14 from . import (
14 from . import (
15 encoding,
15 encoding,
16 error,
16 error,
17 hook,
17 hook,
18 util,
18 util,
19 wireproto,
19 wireproto,
20 )
20 )
21
21
22 class sshserver(wireproto.abstractserverproto):
22 class sshserver(wireproto.abstractserverproto):
23 def __init__(self, ui, repo):
23 def __init__(self, ui, repo):
24 self.ui = ui
24 self.ui = ui
25 self.repo = repo
25 self.repo = repo
26 self.lock = None
26 self.lock = None
27 self.fin = ui.fin
27 self.fin = ui.fin
28 self.fout = ui.fout
28 self.fout = ui.fout
29 self.name = 'ssh'
29 self.name = 'ssh'
30
30
31 hook.redirect(True)
31 hook.redirect(True)
32 ui.fout = repo.ui.fout = ui.ferr
32 ui.fout = repo.ui.fout = ui.ferr
33
33
34 # Prevent insertion/deletion of CRs
34 # Prevent insertion/deletion of CRs
35 util.setbinary(self.fin)
35 util.setbinary(self.fin)
36 util.setbinary(self.fout)
36 util.setbinary(self.fout)
37
37
38 def getargs(self, args):
38 def getargs(self, args):
39 data = {}
39 data = {}
40 keys = args.split()
40 keys = args.split()
41 for n in xrange(len(keys)):
41 for n in xrange(len(keys)):
42 argline = self.fin.readline()[:-1]
42 argline = self.fin.readline()[:-1]
43 arg, l = argline.split()
43 arg, l = argline.split()
44 if arg not in keys:
44 if arg not in keys:
45 raise error.Abort(_("unexpected parameter %r") % arg)
45 raise error.Abort(_("unexpected parameter %r") % arg)
46 if arg == '*':
46 if arg == '*':
47 star = {}
47 star = {}
48 for k in xrange(int(l)):
48 for k in xrange(int(l)):
49 argline = self.fin.readline()[:-1]
49 argline = self.fin.readline()[:-1]
50 arg, l = argline.split()
50 arg, l = argline.split()
51 val = self.fin.read(int(l))
51 val = self.fin.read(int(l))
52 star[arg] = val
52 star[arg] = val
53 data['*'] = star
53 data['*'] = star
54 else:
54 else:
55 val = self.fin.read(int(l))
55 val = self.fin.read(int(l))
56 data[arg] = val
56 data[arg] = val
57 return [data[k] for k in keys]
57 return [data[k] for k in keys]
58
58
59 def getarg(self, name):
59 def getarg(self, name):
60 return self.getargs(name)[0]
60 return self.getargs(name)[0]
61
61
62 def getfile(self, fpout):
62 def getfile(self, fpout):
63 self.sendresponse('')
63 self.sendresponse('')
64 count = int(self.fin.readline())
64 count = int(self.fin.readline())
65 while count:
65 while count:
66 fpout.write(self.fin.read(count))
66 fpout.write(self.fin.read(count))
67 count = int(self.fin.readline())
67 count = int(self.fin.readline())
68
68
69 def redirect(self):
69 def redirect(self):
70 pass
70 pass
71
71
72 def sendresponse(self, v):
72 def sendresponse(self, v):
73 self.fout.write("%d\n" % len(v))
73 self.fout.write("%d\n" % len(v))
74 self.fout.write(v)
74 self.fout.write(v)
75 self.fout.flush()
75 self.fout.flush()
76
76
77 def sendstream(self, source):
77 def sendstream(self, source):
78 write = self.fout.write
78 write = self.fout.write
79
79 for chunk in source.gen:
80 if source.reader:
81 gen = iter(lambda: source.reader.read(4096), '')
82 else:
83 gen = source.gen
84
85 for chunk in gen:
86 write(chunk)
80 write(chunk)
87 self.fout.flush()
81 self.fout.flush()
88
82
89 def sendpushresponse(self, rsp):
83 def sendpushresponse(self, rsp):
90 self.sendresponse('')
84 self.sendresponse('')
91 self.sendresponse(str(rsp.res))
85 self.sendresponse(str(rsp.res))
92
86
93 def sendpusherror(self, rsp):
87 def sendpusherror(self, rsp):
94 self.sendresponse(rsp.res)
88 self.sendresponse(rsp.res)
95
89
96 def sendooberror(self, rsp):
90 def sendooberror(self, rsp):
97 self.ui.ferr.write('%s\n-\n' % rsp.message)
91 self.ui.ferr.write('%s\n-\n' % rsp.message)
98 self.ui.ferr.flush()
92 self.ui.ferr.flush()
99 self.fout.write('\n')
93 self.fout.write('\n')
100 self.fout.flush()
94 self.fout.flush()
101
95
102 def serve_forever(self):
96 def serve_forever(self):
103 try:
97 try:
104 while self.serve_one():
98 while self.serve_one():
105 pass
99 pass
106 finally:
100 finally:
107 if self.lock is not None:
101 if self.lock is not None:
108 self.lock.release()
102 self.lock.release()
109 sys.exit(0)
103 sys.exit(0)
110
104
111 handlers = {
105 handlers = {
112 str: sendresponse,
106 str: sendresponse,
113 wireproto.streamres: sendstream,
107 wireproto.streamres: sendstream,
114 wireproto.pushres: sendpushresponse,
108 wireproto.pushres: sendpushresponse,
115 wireproto.pusherr: sendpusherror,
109 wireproto.pusherr: sendpusherror,
116 wireproto.ooberror: sendooberror,
110 wireproto.ooberror: sendooberror,
117 }
111 }
118
112
119 def serve_one(self):
113 def serve_one(self):
120 cmd = self.fin.readline()[:-1]
114 cmd = self.fin.readline()[:-1]
121 if cmd and cmd in wireproto.commands:
115 if cmd and cmd in wireproto.commands:
122 rsp = wireproto.dispatch(self.repo, self, cmd)
116 rsp = wireproto.dispatch(self.repo, self, cmd)
123 self.handlers[rsp.__class__](self, rsp)
117 self.handlers[rsp.__class__](self, rsp)
124 elif cmd:
118 elif cmd:
125 impl = getattr(self, 'do_' + cmd, None)
119 impl = getattr(self, 'do_' + cmd, None)
126 if impl:
120 if impl:
127 r = impl()
121 r = impl()
128 if r is not None:
122 if r is not None:
129 self.sendresponse(r)
123 self.sendresponse(r)
130 else:
124 else:
131 self.sendresponse("")
125 self.sendresponse("")
132 return cmd != ''
126 return cmd != ''
133
127
134 def _client(self):
128 def _client(self):
135 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
129 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
136 return 'remote:ssh:' + client
130 return 'remote:ssh:' + client
@@ -1,1056 +1,1057
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 )
34 )
35
35
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 'IncompatibleClient')
41 'IncompatibleClient')
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43
43
44 class abstractserverproto(object):
44 class abstractserverproto(object):
45 """abstract class that summarizes the protocol API
45 """abstract class that summarizes the protocol API
46
46
47 Used as reference and documentation.
47 Used as reference and documentation.
48 """
48 """
49
49
50 def getargs(self, args):
50 def getargs(self, args):
51 """return the value for arguments in <args>
51 """return the value for arguments in <args>
52
52
53 returns a list of values (same order as <args>)"""
53 returns a list of values (same order as <args>)"""
54 raise NotImplementedError()
54 raise NotImplementedError()
55
55
56 def getfile(self, fp):
56 def getfile(self, fp):
57 """write the whole content of a file into a file like object
57 """write the whole content of a file into a file like object
58
58
59 The file is in the form::
59 The file is in the form::
60
60
61 (<chunk-size>\n<chunk>)+0\n
61 (<chunk-size>\n<chunk>)+0\n
62
62
63 chunk size is the ascii version of the int.
63 chunk size is the ascii version of the int.
64 """
64 """
65 raise NotImplementedError()
65 raise NotImplementedError()
66
66
67 def redirect(self):
67 def redirect(self):
68 """may setup interception for stdout and stderr
68 """may setup interception for stdout and stderr
69
69
70 See also the `restore` method."""
70 See also the `restore` method."""
71 raise NotImplementedError()
71 raise NotImplementedError()
72
72
73 # If the `redirect` function does install interception, the `restore`
73 # If the `redirect` function does install interception, the `restore`
74 # function MUST be defined. If interception is not used, this function
74 # function MUST be defined. If interception is not used, this function
75 # MUST NOT be defined.
75 # MUST NOT be defined.
76 #
76 #
77 # left commented here on purpose
77 # left commented here on purpose
78 #
78 #
79 #def restore(self):
79 #def restore(self):
80 # """reinstall previous stdout and stderr and return intercepted stdout
80 # """reinstall previous stdout and stderr and return intercepted stdout
81 # """
81 # """
82 # raise NotImplementedError()
82 # raise NotImplementedError()
83
83
84 class remoteiterbatcher(peer.iterbatcher):
84 class remoteiterbatcher(peer.iterbatcher):
85 def __init__(self, remote):
85 def __init__(self, remote):
86 super(remoteiterbatcher, self).__init__()
86 super(remoteiterbatcher, self).__init__()
87 self._remote = remote
87 self._remote = remote
88
88
89 def __getattr__(self, name):
89 def __getattr__(self, name):
90 # Validate this method is batchable, since submit() only supports
90 # Validate this method is batchable, since submit() only supports
91 # batchable methods.
91 # batchable methods.
92 fn = getattr(self._remote, name)
92 fn = getattr(self._remote, name)
93 if not getattr(fn, 'batchable', None):
93 if not getattr(fn, 'batchable', None):
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 'call to %r' % name)
95 'call to %r' % name)
96
96
97 return super(remoteiterbatcher, self).__getattr__(name)
97 return super(remoteiterbatcher, self).__getattr__(name)
98
98
99 def submit(self):
99 def submit(self):
100 """Break the batch request into many patch calls and pipeline them.
100 """Break the batch request into many patch calls and pipeline them.
101
101
102 This is mostly valuable over http where request sizes can be
102 This is mostly valuable over http where request sizes can be
103 limited, but can be used in other places as well.
103 limited, but can be used in other places as well.
104 """
104 """
105 # 2-tuple of (command, arguments) that represents what will be
105 # 2-tuple of (command, arguments) that represents what will be
106 # sent over the wire.
106 # sent over the wire.
107 requests = []
107 requests = []
108
108
109 # 4-tuple of (command, final future, @batchable generator, remote
109 # 4-tuple of (command, final future, @batchable generator, remote
110 # future).
110 # future).
111 results = []
111 results = []
112
112
113 for command, args, opts, finalfuture in self.calls:
113 for command, args, opts, finalfuture in self.calls:
114 mtd = getattr(self._remote, command)
114 mtd = getattr(self._remote, command)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116
116
117 commandargs, fremote = next(batchable)
117 commandargs, fremote = next(batchable)
118 assert fremote
118 assert fremote
119 requests.append((command, commandargs))
119 requests.append((command, commandargs))
120 results.append((command, finalfuture, batchable, fremote))
120 results.append((command, finalfuture, batchable, fremote))
121
121
122 if requests:
122 if requests:
123 self._resultiter = self._remote._submitbatch(requests)
123 self._resultiter = self._remote._submitbatch(requests)
124
124
125 self._results = results
125 self._results = results
126
126
127 def results(self):
127 def results(self):
128 for command, finalfuture, batchable, remotefuture in self._results:
128 for command, finalfuture, batchable, remotefuture in self._results:
129 # Get the raw result, set it in the remote future, feed it
129 # Get the raw result, set it in the remote future, feed it
130 # back into the @batchable generator so it can be decoded, and
130 # back into the @batchable generator so it can be decoded, and
131 # set the result on the final future to this value.
131 # set the result on the final future to this value.
132 remoteresult = next(self._resultiter)
132 remoteresult = next(self._resultiter)
133 remotefuture.set(remoteresult)
133 remotefuture.set(remoteresult)
134 finalfuture.set(next(batchable))
134 finalfuture.set(next(batchable))
135
135
136 # Verify our @batchable generators only emit 2 values.
136 # Verify our @batchable generators only emit 2 values.
137 try:
137 try:
138 next(batchable)
138 next(batchable)
139 except StopIteration:
139 except StopIteration:
140 pass
140 pass
141 else:
141 else:
142 raise error.ProgrammingError('%s @batchable generator emitted '
142 raise error.ProgrammingError('%s @batchable generator emitted '
143 'unexpected value count' % command)
143 'unexpected value count' % command)
144
144
145 yield finalfuture.value
145 yield finalfuture.value
146
146
147 # Forward a couple of names from peer to make wireproto interactions
147 # Forward a couple of names from peer to make wireproto interactions
148 # slightly more sensible.
148 # slightly more sensible.
149 batchable = peer.batchable
149 batchable = peer.batchable
150 future = peer.future
150 future = peer.future
151
151
152 # list of nodes encoding / decoding
152 # list of nodes encoding / decoding
153
153
154 def decodelist(l, sep=' '):
154 def decodelist(l, sep=' '):
155 if l:
155 if l:
156 return [bin(v) for v in l.split(sep)]
156 return [bin(v) for v in l.split(sep)]
157 return []
157 return []
158
158
159 def encodelist(l, sep=' '):
159 def encodelist(l, sep=' '):
160 try:
160 try:
161 return sep.join(map(hex, l))
161 return sep.join(map(hex, l))
162 except TypeError:
162 except TypeError:
163 raise
163 raise
164
164
165 # batched call argument encoding
165 # batched call argument encoding
166
166
167 def escapearg(plain):
167 def escapearg(plain):
168 return (plain
168 return (plain
169 .replace(':', ':c')
169 .replace(':', ':c')
170 .replace(',', ':o')
170 .replace(',', ':o')
171 .replace(';', ':s')
171 .replace(';', ':s')
172 .replace('=', ':e'))
172 .replace('=', ':e'))
173
173
174 def unescapearg(escaped):
174 def unescapearg(escaped):
175 return (escaped
175 return (escaped
176 .replace(':e', '=')
176 .replace(':e', '=')
177 .replace(':s', ';')
177 .replace(':s', ';')
178 .replace(':o', ',')
178 .replace(':o', ',')
179 .replace(':c', ':'))
179 .replace(':c', ':'))
180
180
181 def encodebatchcmds(req):
181 def encodebatchcmds(req):
182 """Return a ``cmds`` argument value for the ``batch`` command."""
182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 cmds = []
183 cmds = []
184 for op, argsdict in req:
184 for op, argsdict in req:
185 # Old servers didn't properly unescape argument names. So prevent
185 # Old servers didn't properly unescape argument names. So prevent
186 # the sending of argument names that may not be decoded properly by
186 # the sending of argument names that may not be decoded properly by
187 # servers.
187 # servers.
188 assert all(escapearg(k) == k for k in argsdict)
188 assert all(escapearg(k) == k for k in argsdict)
189
189
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 for k, v in argsdict.iteritems())
191 for k, v in argsdict.iteritems())
192 cmds.append('%s %s' % (op, args))
192 cmds.append('%s %s' % (op, args))
193
193
194 return ';'.join(cmds)
194 return ';'.join(cmds)
195
195
196 # mapping of options accepted by getbundle and their types
196 # mapping of options accepted by getbundle and their types
197 #
197 #
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 # such options are properly processed in exchange.getbundle.
199 # such options are properly processed in exchange.getbundle.
200 #
200 #
201 # supported types are:
201 # supported types are:
202 #
202 #
203 # :nodes: list of binary nodes
203 # :nodes: list of binary nodes
204 # :csv: list of comma-separated values
204 # :csv: list of comma-separated values
205 # :scsv: list of comma-separated values return as set
205 # :scsv: list of comma-separated values return as set
206 # :plain: string with no transformation needed.
206 # :plain: string with no transformation needed.
207 gboptsmap = {'heads': 'nodes',
207 gboptsmap = {'heads': 'nodes',
208 'bookmarks': 'boolean',
208 'bookmarks': 'boolean',
209 'common': 'nodes',
209 'common': 'nodes',
210 'obsmarkers': 'boolean',
210 'obsmarkers': 'boolean',
211 'phases': 'boolean',
211 'phases': 'boolean',
212 'bundlecaps': 'scsv',
212 'bundlecaps': 'scsv',
213 'listkeys': 'csv',
213 'listkeys': 'csv',
214 'cg': 'boolean',
214 'cg': 'boolean',
215 'cbattempted': 'boolean'}
215 'cbattempted': 'boolean'}
216
216
217 # client side
217 # client side
218
218
219 class wirepeer(repository.legacypeer):
219 class wirepeer(repository.legacypeer):
220 """Client-side interface for communicating with a peer repository.
220 """Client-side interface for communicating with a peer repository.
221
221
222 Methods commonly call wire protocol commands of the same name.
222 Methods commonly call wire protocol commands of the same name.
223
223
224 See also httppeer.py and sshpeer.py for protocol-specific
224 See also httppeer.py and sshpeer.py for protocol-specific
225 implementations of this interface.
225 implementations of this interface.
226 """
226 """
227 # Begin of basewirepeer interface.
227 # Begin of basewirepeer interface.
228
228
229 def iterbatch(self):
229 def iterbatch(self):
230 return remoteiterbatcher(self)
230 return remoteiterbatcher(self)
231
231
232 @batchable
232 @batchable
233 def lookup(self, key):
233 def lookup(self, key):
234 self.requirecap('lookup', _('look up remote revision'))
234 self.requirecap('lookup', _('look up remote revision'))
235 f = future()
235 f = future()
236 yield {'key': encoding.fromlocal(key)}, f
236 yield {'key': encoding.fromlocal(key)}, f
237 d = f.value
237 d = f.value
238 success, data = d[:-1].split(" ", 1)
238 success, data = d[:-1].split(" ", 1)
239 if int(success):
239 if int(success):
240 yield bin(data)
240 yield bin(data)
241 else:
241 else:
242 self._abort(error.RepoError(data))
242 self._abort(error.RepoError(data))
243
243
244 @batchable
244 @batchable
245 def heads(self):
245 def heads(self):
246 f = future()
246 f = future()
247 yield {}, f
247 yield {}, f
248 d = f.value
248 d = f.value
249 try:
249 try:
250 yield decodelist(d[:-1])
250 yield decodelist(d[:-1])
251 except ValueError:
251 except ValueError:
252 self._abort(error.ResponseError(_("unexpected response:"), d))
252 self._abort(error.ResponseError(_("unexpected response:"), d))
253
253
254 @batchable
254 @batchable
255 def known(self, nodes):
255 def known(self, nodes):
256 f = future()
256 f = future()
257 yield {'nodes': encodelist(nodes)}, f
257 yield {'nodes': encodelist(nodes)}, f
258 d = f.value
258 d = f.value
259 try:
259 try:
260 yield [bool(int(b)) for b in d]
260 yield [bool(int(b)) for b in d]
261 except ValueError:
261 except ValueError:
262 self._abort(error.ResponseError(_("unexpected response:"), d))
262 self._abort(error.ResponseError(_("unexpected response:"), d))
263
263
264 @batchable
264 @batchable
265 def branchmap(self):
265 def branchmap(self):
266 f = future()
266 f = future()
267 yield {}, f
267 yield {}, f
268 d = f.value
268 d = f.value
269 try:
269 try:
270 branchmap = {}
270 branchmap = {}
271 for branchpart in d.splitlines():
271 for branchpart in d.splitlines():
272 branchname, branchheads = branchpart.split(' ', 1)
272 branchname, branchheads = branchpart.split(' ', 1)
273 branchname = encoding.tolocal(urlreq.unquote(branchname))
273 branchname = encoding.tolocal(urlreq.unquote(branchname))
274 branchheads = decodelist(branchheads)
274 branchheads = decodelist(branchheads)
275 branchmap[branchname] = branchheads
275 branchmap[branchname] = branchheads
276 yield branchmap
276 yield branchmap
277 except TypeError:
277 except TypeError:
278 self._abort(error.ResponseError(_("unexpected response:"), d))
278 self._abort(error.ResponseError(_("unexpected response:"), d))
279
279
280 @batchable
280 @batchable
281 def listkeys(self, namespace):
281 def listkeys(self, namespace):
282 if not self.capable('pushkey'):
282 if not self.capable('pushkey'):
283 yield {}, None
283 yield {}, None
284 f = future()
284 f = future()
285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
286 yield {'namespace': encoding.fromlocal(namespace)}, f
286 yield {'namespace': encoding.fromlocal(namespace)}, f
287 d = f.value
287 d = f.value
288 self.ui.debug('received listkey for "%s": %i bytes\n'
288 self.ui.debug('received listkey for "%s": %i bytes\n'
289 % (namespace, len(d)))
289 % (namespace, len(d)))
290 yield pushkeymod.decodekeys(d)
290 yield pushkeymod.decodekeys(d)
291
291
292 @batchable
292 @batchable
293 def pushkey(self, namespace, key, old, new):
293 def pushkey(self, namespace, key, old, new):
294 if not self.capable('pushkey'):
294 if not self.capable('pushkey'):
295 yield False, None
295 yield False, None
296 f = future()
296 f = future()
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 yield {'namespace': encoding.fromlocal(namespace),
298 yield {'namespace': encoding.fromlocal(namespace),
299 'key': encoding.fromlocal(key),
299 'key': encoding.fromlocal(key),
300 'old': encoding.fromlocal(old),
300 'old': encoding.fromlocal(old),
301 'new': encoding.fromlocal(new)}, f
301 'new': encoding.fromlocal(new)}, f
302 d = f.value
302 d = f.value
303 d, output = d.split('\n', 1)
303 d, output = d.split('\n', 1)
304 try:
304 try:
305 d = bool(int(d))
305 d = bool(int(d))
306 except ValueError:
306 except ValueError:
307 raise error.ResponseError(
307 raise error.ResponseError(
308 _('push failed (unexpected response):'), d)
308 _('push failed (unexpected response):'), d)
309 for l in output.splitlines(True):
309 for l in output.splitlines(True):
310 self.ui.status(_('remote: '), l)
310 self.ui.status(_('remote: '), l)
311 yield d
311 yield d
312
312
313 def stream_out(self):
313 def stream_out(self):
314 return self._callstream('stream_out')
314 return self._callstream('stream_out')
315
315
316 def getbundle(self, source, **kwargs):
316 def getbundle(self, source, **kwargs):
317 kwargs = pycompat.byteskwargs(kwargs)
317 kwargs = pycompat.byteskwargs(kwargs)
318 self.requirecap('getbundle', _('look up remote changes'))
318 self.requirecap('getbundle', _('look up remote changes'))
319 opts = {}
319 opts = {}
320 bundlecaps = kwargs.get('bundlecaps')
320 bundlecaps = kwargs.get('bundlecaps')
321 if bundlecaps is not None:
321 if bundlecaps is not None:
322 kwargs['bundlecaps'] = sorted(bundlecaps)
322 kwargs['bundlecaps'] = sorted(bundlecaps)
323 else:
323 else:
324 bundlecaps = () # kwargs could have it to None
324 bundlecaps = () # kwargs could have it to None
325 for key, value in kwargs.iteritems():
325 for key, value in kwargs.iteritems():
326 if value is None:
326 if value is None:
327 continue
327 continue
328 keytype = gboptsmap.get(key)
328 keytype = gboptsmap.get(key)
329 if keytype is None:
329 if keytype is None:
330 raise error.ProgrammingError(
330 raise error.ProgrammingError(
331 'Unexpectedly None keytype for key %s' % key)
331 'Unexpectedly None keytype for key %s' % key)
332 elif keytype == 'nodes':
332 elif keytype == 'nodes':
333 value = encodelist(value)
333 value = encodelist(value)
334 elif keytype in ('csv', 'scsv'):
334 elif keytype in ('csv', 'scsv'):
335 value = ','.join(value)
335 value = ','.join(value)
336 elif keytype == 'boolean':
336 elif keytype == 'boolean':
337 value = '%i' % bool(value)
337 value = '%i' % bool(value)
338 elif keytype != 'plain':
338 elif keytype != 'plain':
339 raise KeyError('unknown getbundle option type %s'
339 raise KeyError('unknown getbundle option type %s'
340 % keytype)
340 % keytype)
341 opts[key] = value
341 opts[key] = value
342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
343 if any((cap.startswith('HG2') for cap in bundlecaps)):
343 if any((cap.startswith('HG2') for cap in bundlecaps)):
344 return bundle2.getunbundler(self.ui, f)
344 return bundle2.getunbundler(self.ui, f)
345 else:
345 else:
346 return changegroupmod.cg1unpacker(f, 'UN')
346 return changegroupmod.cg1unpacker(f, 'UN')
347
347
348 def unbundle(self, cg, heads, url):
348 def unbundle(self, cg, heads, url):
349 '''Send cg (a readable file-like object representing the
349 '''Send cg (a readable file-like object representing the
350 changegroup to push, typically a chunkbuffer object) to the
350 changegroup to push, typically a chunkbuffer object) to the
351 remote server as a bundle.
351 remote server as a bundle.
352
352
353 When pushing a bundle10 stream, return an integer indicating the
353 When pushing a bundle10 stream, return an integer indicating the
354 result of the push (see changegroup.apply()).
354 result of the push (see changegroup.apply()).
355
355
356 When pushing a bundle20 stream, return a bundle20 stream.
356 When pushing a bundle20 stream, return a bundle20 stream.
357
357
358 `url` is the url the client thinks it's pushing to, which is
358 `url` is the url the client thinks it's pushing to, which is
359 visible to hooks.
359 visible to hooks.
360 '''
360 '''
361
361
362 if heads != ['force'] and self.capable('unbundlehash'):
362 if heads != ['force'] and self.capable('unbundlehash'):
363 heads = encodelist(['hashed',
363 heads = encodelist(['hashed',
364 hashlib.sha1(''.join(sorted(heads))).digest()])
364 hashlib.sha1(''.join(sorted(heads))).digest()])
365 else:
365 else:
366 heads = encodelist(heads)
366 heads = encodelist(heads)
367
367
368 if util.safehasattr(cg, 'deltaheader'):
368 if util.safehasattr(cg, 'deltaheader'):
369 # this a bundle10, do the old style call sequence
369 # this a bundle10, do the old style call sequence
370 ret, output = self._callpush("unbundle", cg, heads=heads)
370 ret, output = self._callpush("unbundle", cg, heads=heads)
371 if ret == "":
371 if ret == "":
372 raise error.ResponseError(
372 raise error.ResponseError(
373 _('push failed:'), output)
373 _('push failed:'), output)
374 try:
374 try:
375 ret = int(ret)
375 ret = int(ret)
376 except ValueError:
376 except ValueError:
377 raise error.ResponseError(
377 raise error.ResponseError(
378 _('push failed (unexpected response):'), ret)
378 _('push failed (unexpected response):'), ret)
379
379
380 for l in output.splitlines(True):
380 for l in output.splitlines(True):
381 self.ui.status(_('remote: '), l)
381 self.ui.status(_('remote: '), l)
382 else:
382 else:
383 # bundle2 push. Send a stream, fetch a stream.
383 # bundle2 push. Send a stream, fetch a stream.
384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
385 ret = bundle2.getunbundler(self.ui, stream)
385 ret = bundle2.getunbundler(self.ui, stream)
386 return ret
386 return ret
387
387
388 # End of basewirepeer interface.
388 # End of basewirepeer interface.
389
389
390 # Begin of baselegacywirepeer interface.
390 # Begin of baselegacywirepeer interface.
391
391
392 def branches(self, nodes):
392 def branches(self, nodes):
393 n = encodelist(nodes)
393 n = encodelist(nodes)
394 d = self._call("branches", nodes=n)
394 d = self._call("branches", nodes=n)
395 try:
395 try:
396 br = [tuple(decodelist(b)) for b in d.splitlines()]
396 br = [tuple(decodelist(b)) for b in d.splitlines()]
397 return br
397 return br
398 except ValueError:
398 except ValueError:
399 self._abort(error.ResponseError(_("unexpected response:"), d))
399 self._abort(error.ResponseError(_("unexpected response:"), d))
400
400
401 def between(self, pairs):
401 def between(self, pairs):
402 batch = 8 # avoid giant requests
402 batch = 8 # avoid giant requests
403 r = []
403 r = []
404 for i in xrange(0, len(pairs), batch):
404 for i in xrange(0, len(pairs), batch):
405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
406 d = self._call("between", pairs=n)
406 d = self._call("between", pairs=n)
407 try:
407 try:
408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
409 except ValueError:
409 except ValueError:
410 self._abort(error.ResponseError(_("unexpected response:"), d))
410 self._abort(error.ResponseError(_("unexpected response:"), d))
411 return r
411 return r
412
412
413 def changegroup(self, nodes, kind):
413 def changegroup(self, nodes, kind):
414 n = encodelist(nodes)
414 n = encodelist(nodes)
415 f = self._callcompressable("changegroup", roots=n)
415 f = self._callcompressable("changegroup", roots=n)
416 return changegroupmod.cg1unpacker(f, 'UN')
416 return changegroupmod.cg1unpacker(f, 'UN')
417
417
418 def changegroupsubset(self, bases, heads, kind):
418 def changegroupsubset(self, bases, heads, kind):
419 self.requirecap('changegroupsubset', _('look up remote changes'))
419 self.requirecap('changegroupsubset', _('look up remote changes'))
420 bases = encodelist(bases)
420 bases = encodelist(bases)
421 heads = encodelist(heads)
421 heads = encodelist(heads)
422 f = self._callcompressable("changegroupsubset",
422 f = self._callcompressable("changegroupsubset",
423 bases=bases, heads=heads)
423 bases=bases, heads=heads)
424 return changegroupmod.cg1unpacker(f, 'UN')
424 return changegroupmod.cg1unpacker(f, 'UN')
425
425
426 # End of baselegacywirepeer interface.
426 # End of baselegacywirepeer interface.
427
427
428 def _submitbatch(self, req):
428 def _submitbatch(self, req):
429 """run batch request <req> on the server
429 """run batch request <req> on the server
430
430
431 Returns an iterator of the raw responses from the server.
431 Returns an iterator of the raw responses from the server.
432 """
432 """
433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
434 chunk = rsp.read(1024)
434 chunk = rsp.read(1024)
435 work = [chunk]
435 work = [chunk]
436 while chunk:
436 while chunk:
437 while ';' not in chunk and chunk:
437 while ';' not in chunk and chunk:
438 chunk = rsp.read(1024)
438 chunk = rsp.read(1024)
439 work.append(chunk)
439 work.append(chunk)
440 merged = ''.join(work)
440 merged = ''.join(work)
441 while ';' in merged:
441 while ';' in merged:
442 one, merged = merged.split(';', 1)
442 one, merged = merged.split(';', 1)
443 yield unescapearg(one)
443 yield unescapearg(one)
444 chunk = rsp.read(1024)
444 chunk = rsp.read(1024)
445 work = [merged, chunk]
445 work = [merged, chunk]
446 yield unescapearg(''.join(work))
446 yield unescapearg(''.join(work))
447
447
448 def _submitone(self, op, args):
448 def _submitone(self, op, args):
449 return self._call(op, **pycompat.strkwargs(args))
449 return self._call(op, **pycompat.strkwargs(args))
450
450
451 def debugwireargs(self, one, two, three=None, four=None, five=None):
451 def debugwireargs(self, one, two, three=None, four=None, five=None):
452 # don't pass optional arguments left at their default value
452 # don't pass optional arguments left at their default value
453 opts = {}
453 opts = {}
454 if three is not None:
454 if three is not None:
455 opts[r'three'] = three
455 opts[r'three'] = three
456 if four is not None:
456 if four is not None:
457 opts[r'four'] = four
457 opts[r'four'] = four
458 return self._call('debugwireargs', one=one, two=two, **opts)
458 return self._call('debugwireargs', one=one, two=two, **opts)
459
459
460 def _call(self, cmd, **args):
460 def _call(self, cmd, **args):
461 """execute <cmd> on the server
461 """execute <cmd> on the server
462
462
463 The command is expected to return a simple string.
463 The command is expected to return a simple string.
464
464
465 returns the server reply as a string."""
465 returns the server reply as a string."""
466 raise NotImplementedError()
466 raise NotImplementedError()
467
467
468 def _callstream(self, cmd, **args):
468 def _callstream(self, cmd, **args):
469 """execute <cmd> on the server
469 """execute <cmd> on the server
470
470
471 The command is expected to return a stream. Note that if the
471 The command is expected to return a stream. Note that if the
472 command doesn't return a stream, _callstream behaves
472 command doesn't return a stream, _callstream behaves
473 differently for ssh and http peers.
473 differently for ssh and http peers.
474
474
475 returns the server reply as a file like object.
475 returns the server reply as a file like object.
476 """
476 """
477 raise NotImplementedError()
477 raise NotImplementedError()
478
478
479 def _callcompressable(self, cmd, **args):
479 def _callcompressable(self, cmd, **args):
480 """execute <cmd> on the server
480 """execute <cmd> on the server
481
481
482 The command is expected to return a stream.
482 The command is expected to return a stream.
483
483
484 The stream may have been compressed in some implementations. This
484 The stream may have been compressed in some implementations. This
485 function takes care of the decompression. This is the only difference
485 function takes care of the decompression. This is the only difference
486 with _callstream.
486 with _callstream.
487
487
488 returns the server reply as a file like object.
488 returns the server reply as a file like object.
489 """
489 """
490 raise NotImplementedError()
490 raise NotImplementedError()
491
491
492 def _callpush(self, cmd, fp, **args):
492 def _callpush(self, cmd, fp, **args):
493 """execute a <cmd> on server
493 """execute a <cmd> on server
494
494
495 The command is expected to be related to a push. Push has a special
495 The command is expected to be related to a push. Push has a special
496 return method.
496 return method.
497
497
498 returns the server reply as a (ret, output) tuple. ret is either
498 returns the server reply as a (ret, output) tuple. ret is either
499 empty (error) or a stringified int.
499 empty (error) or a stringified int.
500 """
500 """
501 raise NotImplementedError()
501 raise NotImplementedError()
502
502
503 def _calltwowaystream(self, cmd, fp, **args):
503 def _calltwowaystream(self, cmd, fp, **args):
504 """execute <cmd> on server
504 """execute <cmd> on server
505
505
506 The command will send a stream to the server and get a stream in reply.
506 The command will send a stream to the server and get a stream in reply.
507 """
507 """
508 raise NotImplementedError()
508 raise NotImplementedError()
509
509
510 def _abort(self, exception):
510 def _abort(self, exception):
511 """clearly abort the wire protocol connection and raise the exception
511 """clearly abort the wire protocol connection and raise the exception
512 """
512 """
513 raise NotImplementedError()
513 raise NotImplementedError()
514
514
515 # server side
515 # server side
516
516
517 # wire protocol command can either return a string or one of these classes.
517 # wire protocol command can either return a string or one of these classes.
518 class streamres(object):
518 class streamres(object):
519 """wireproto reply: binary stream
519 """wireproto reply: binary stream
520
520
521 The call was successful and the result is a stream.
521 The call was successful and the result is a stream.
522
522
523 Accepts either a generator or an object with a ``read(size)`` method.
523 Accepts a generator containing chunks of data to be sent to the client.
524
524
525 ``v1compressible`` indicates whether this data can be compressed to
525 ``v1compressible`` indicates whether this data can be compressed to
526 "version 1" clients (technically: HTTP peers using
526 "version 1" clients (technically: HTTP peers using
527 application/mercurial-0.1 media type). This flag should NOT be used on
527 application/mercurial-0.1 media type). This flag should NOT be used on
528 new commands because new clients should support a more modern compression
528 new commands because new clients should support a more modern compression
529 mechanism.
529 mechanism.
530 """
530 """
531 def __init__(self, gen=None, reader=None, v1compressible=False):
531 def __init__(self, gen=None, v1compressible=False):
532 self.gen = gen
532 self.gen = gen
533 self.reader = reader
534 self.v1compressible = v1compressible
533 self.v1compressible = v1compressible
535
534
536 class pushres(object):
535 class pushres(object):
537 """wireproto reply: success with simple integer return
536 """wireproto reply: success with simple integer return
538
537
539 The call was successful and returned an integer contained in `self.res`.
538 The call was successful and returned an integer contained in `self.res`.
540 """
539 """
541 def __init__(self, res):
540 def __init__(self, res):
542 self.res = res
541 self.res = res
543
542
544 class pusherr(object):
543 class pusherr(object):
545 """wireproto reply: failure
544 """wireproto reply: failure
546
545
547 The call failed. The `self.res` attribute contains the error message.
546 The call failed. The `self.res` attribute contains the error message.
548 """
547 """
549 def __init__(self, res):
548 def __init__(self, res):
550 self.res = res
549 self.res = res
551
550
552 class ooberror(object):
551 class ooberror(object):
553 """wireproto reply: failure of a batch of operation
552 """wireproto reply: failure of a batch of operation
554
553
555 Something failed during a batch call. The error message is stored in
554 Something failed during a batch call. The error message is stored in
556 `self.message`.
555 `self.message`.
557 """
556 """
558 def __init__(self, message):
557 def __init__(self, message):
559 self.message = message
558 self.message = message
560
559
561 def getdispatchrepo(repo, proto, command):
560 def getdispatchrepo(repo, proto, command):
562 """Obtain the repo used for processing wire protocol commands.
561 """Obtain the repo used for processing wire protocol commands.
563
562
564 The intent of this function is to serve as a monkeypatch point for
563 The intent of this function is to serve as a monkeypatch point for
565 extensions that need commands to operate on different repo views under
564 extensions that need commands to operate on different repo views under
566 specialized circumstances.
565 specialized circumstances.
567 """
566 """
568 return repo.filtered('served')
567 return repo.filtered('served')
569
568
570 def dispatch(repo, proto, command):
569 def dispatch(repo, proto, command):
571 repo = getdispatchrepo(repo, proto, command)
570 repo = getdispatchrepo(repo, proto, command)
572 func, spec = commands[command]
571 func, spec = commands[command]
573 args = proto.getargs(spec)
572 args = proto.getargs(spec)
574 return func(repo, proto, *args)
573 return func(repo, proto, *args)
575
574
576 def options(cmd, keys, others):
575 def options(cmd, keys, others):
577 opts = {}
576 opts = {}
578 for k in keys:
577 for k in keys:
579 if k in others:
578 if k in others:
580 opts[k] = others[k]
579 opts[k] = others[k]
581 del others[k]
580 del others[k]
582 if others:
581 if others:
583 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
582 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
584 % (cmd, ",".join(others)))
583 % (cmd, ",".join(others)))
585 return opts
584 return opts
586
585
587 def bundle1allowed(repo, action):
586 def bundle1allowed(repo, action):
588 """Whether a bundle1 operation is allowed from the server.
587 """Whether a bundle1 operation is allowed from the server.
589
588
590 Priority is:
589 Priority is:
591
590
592 1. server.bundle1gd.<action> (if generaldelta active)
591 1. server.bundle1gd.<action> (if generaldelta active)
593 2. server.bundle1.<action>
592 2. server.bundle1.<action>
594 3. server.bundle1gd (if generaldelta active)
593 3. server.bundle1gd (if generaldelta active)
595 4. server.bundle1
594 4. server.bundle1
596 """
595 """
597 ui = repo.ui
596 ui = repo.ui
598 gd = 'generaldelta' in repo.requirements
597 gd = 'generaldelta' in repo.requirements
599
598
600 if gd:
599 if gd:
601 v = ui.configbool('server', 'bundle1gd.%s' % action)
600 v = ui.configbool('server', 'bundle1gd.%s' % action)
602 if v is not None:
601 if v is not None:
603 return v
602 return v
604
603
605 v = ui.configbool('server', 'bundle1.%s' % action)
604 v = ui.configbool('server', 'bundle1.%s' % action)
606 if v is not None:
605 if v is not None:
607 return v
606 return v
608
607
609 if gd:
608 if gd:
610 v = ui.configbool('server', 'bundle1gd')
609 v = ui.configbool('server', 'bundle1gd')
611 if v is not None:
610 if v is not None:
612 return v
611 return v
613
612
614 return ui.configbool('server', 'bundle1')
613 return ui.configbool('server', 'bundle1')
615
614
616 def supportedcompengines(ui, proto, role):
615 def supportedcompengines(ui, proto, role):
617 """Obtain the list of supported compression engines for a request."""
616 """Obtain the list of supported compression engines for a request."""
618 assert role in (util.CLIENTROLE, util.SERVERROLE)
617 assert role in (util.CLIENTROLE, util.SERVERROLE)
619
618
620 compengines = util.compengines.supportedwireengines(role)
619 compengines = util.compengines.supportedwireengines(role)
621
620
622 # Allow config to override default list and ordering.
621 # Allow config to override default list and ordering.
623 if role == util.SERVERROLE:
622 if role == util.SERVERROLE:
624 configengines = ui.configlist('server', 'compressionengines')
623 configengines = ui.configlist('server', 'compressionengines')
625 config = 'server.compressionengines'
624 config = 'server.compressionengines'
626 else:
625 else:
627 # This is currently implemented mainly to facilitate testing. In most
626 # This is currently implemented mainly to facilitate testing. In most
628 # cases, the server should be in charge of choosing a compression engine
627 # cases, the server should be in charge of choosing a compression engine
629 # because a server has the most to lose from a sub-optimal choice. (e.g.
628 # because a server has the most to lose from a sub-optimal choice. (e.g.
630 # CPU DoS due to an expensive engine or a network DoS due to poor
629 # CPU DoS due to an expensive engine or a network DoS due to poor
631 # compression ratio).
630 # compression ratio).
632 configengines = ui.configlist('experimental',
631 configengines = ui.configlist('experimental',
633 'clientcompressionengines')
632 'clientcompressionengines')
634 config = 'experimental.clientcompressionengines'
633 config = 'experimental.clientcompressionengines'
635
634
636 # No explicit config. Filter out the ones that aren't supposed to be
635 # No explicit config. Filter out the ones that aren't supposed to be
637 # advertised and return default ordering.
636 # advertised and return default ordering.
638 if not configengines:
637 if not configengines:
639 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
638 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
640 return [e for e in compengines
639 return [e for e in compengines
641 if getattr(e.wireprotosupport(), attr) > 0]
640 if getattr(e.wireprotosupport(), attr) > 0]
642
641
643 # If compression engines are listed in the config, assume there is a good
642 # If compression engines are listed in the config, assume there is a good
644 # reason for it (like server operators wanting to achieve specific
643 # reason for it (like server operators wanting to achieve specific
645 # performance characteristics). So fail fast if the config references
644 # performance characteristics). So fail fast if the config references
646 # unusable compression engines.
645 # unusable compression engines.
647 validnames = set(e.name() for e in compengines)
646 validnames = set(e.name() for e in compengines)
648 invalidnames = set(e for e in configengines if e not in validnames)
647 invalidnames = set(e for e in configengines if e not in validnames)
649 if invalidnames:
648 if invalidnames:
650 raise error.Abort(_('invalid compression engine defined in %s: %s') %
649 raise error.Abort(_('invalid compression engine defined in %s: %s') %
651 (config, ', '.join(sorted(invalidnames))))
650 (config, ', '.join(sorted(invalidnames))))
652
651
653 compengines = [e for e in compengines if e.name() in configengines]
652 compengines = [e for e in compengines if e.name() in configengines]
654 compengines = sorted(compengines,
653 compengines = sorted(compengines,
655 key=lambda e: configengines.index(e.name()))
654 key=lambda e: configengines.index(e.name()))
656
655
657 if not compengines:
656 if not compengines:
658 raise error.Abort(_('%s config option does not specify any known '
657 raise error.Abort(_('%s config option does not specify any known '
659 'compression engines') % config,
658 'compression engines') % config,
660 hint=_('usable compression engines: %s') %
659 hint=_('usable compression engines: %s') %
661 ', '.sorted(validnames))
660 ', '.sorted(validnames))
662
661
663 return compengines
662 return compengines
664
663
665 # list of commands
664 # list of commands
666 commands = {}
665 commands = {}
667
666
668 def wireprotocommand(name, args=''):
667 def wireprotocommand(name, args=''):
669 """decorator for wire protocol command"""
668 """decorator for wire protocol command"""
670 def register(func):
669 def register(func):
671 commands[name] = (func, args)
670 commands[name] = (func, args)
672 return func
671 return func
673 return register
672 return register
674
673
675 @wireprotocommand('batch', 'cmds *')
674 @wireprotocommand('batch', 'cmds *')
676 def batch(repo, proto, cmds, others):
675 def batch(repo, proto, cmds, others):
677 repo = repo.filtered("served")
676 repo = repo.filtered("served")
678 res = []
677 res = []
679 for pair in cmds.split(';'):
678 for pair in cmds.split(';'):
680 op, args = pair.split(' ', 1)
679 op, args = pair.split(' ', 1)
681 vals = {}
680 vals = {}
682 for a in args.split(','):
681 for a in args.split(','):
683 if a:
682 if a:
684 n, v = a.split('=')
683 n, v = a.split('=')
685 vals[unescapearg(n)] = unescapearg(v)
684 vals[unescapearg(n)] = unescapearg(v)
686 func, spec = commands[op]
685 func, spec = commands[op]
687 if spec:
686 if spec:
688 keys = spec.split()
687 keys = spec.split()
689 data = {}
688 data = {}
690 for k in keys:
689 for k in keys:
691 if k == '*':
690 if k == '*':
692 star = {}
691 star = {}
693 for key in vals.keys():
692 for key in vals.keys():
694 if key not in keys:
693 if key not in keys:
695 star[key] = vals[key]
694 star[key] = vals[key]
696 data['*'] = star
695 data['*'] = star
697 else:
696 else:
698 data[k] = vals[k]
697 data[k] = vals[k]
699 result = func(repo, proto, *[data[k] for k in keys])
698 result = func(repo, proto, *[data[k] for k in keys])
700 else:
699 else:
701 result = func(repo, proto)
700 result = func(repo, proto)
702 if isinstance(result, ooberror):
701 if isinstance(result, ooberror):
703 return result
702 return result
704 res.append(escapearg(result))
703 res.append(escapearg(result))
705 return ';'.join(res)
704 return ';'.join(res)
706
705
707 @wireprotocommand('between', 'pairs')
706 @wireprotocommand('between', 'pairs')
708 def between(repo, proto, pairs):
707 def between(repo, proto, pairs):
709 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
708 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
710 r = []
709 r = []
711 for b in repo.between(pairs):
710 for b in repo.between(pairs):
712 r.append(encodelist(b) + "\n")
711 r.append(encodelist(b) + "\n")
713 return "".join(r)
712 return "".join(r)
714
713
715 @wireprotocommand('branchmap')
714 @wireprotocommand('branchmap')
716 def branchmap(repo, proto):
715 def branchmap(repo, proto):
717 branchmap = repo.branchmap()
716 branchmap = repo.branchmap()
718 heads = []
717 heads = []
719 for branch, nodes in branchmap.iteritems():
718 for branch, nodes in branchmap.iteritems():
720 branchname = urlreq.quote(encoding.fromlocal(branch))
719 branchname = urlreq.quote(encoding.fromlocal(branch))
721 branchnodes = encodelist(nodes)
720 branchnodes = encodelist(nodes)
722 heads.append('%s %s' % (branchname, branchnodes))
721 heads.append('%s %s' % (branchname, branchnodes))
723 return '\n'.join(heads)
722 return '\n'.join(heads)
724
723
725 @wireprotocommand('branches', 'nodes')
724 @wireprotocommand('branches', 'nodes')
726 def branches(repo, proto, nodes):
725 def branches(repo, proto, nodes):
727 nodes = decodelist(nodes)
726 nodes = decodelist(nodes)
728 r = []
727 r = []
729 for b in repo.branches(nodes):
728 for b in repo.branches(nodes):
730 r.append(encodelist(b) + "\n")
729 r.append(encodelist(b) + "\n")
731 return "".join(r)
730 return "".join(r)
732
731
733 @wireprotocommand('clonebundles', '')
732 @wireprotocommand('clonebundles', '')
734 def clonebundles(repo, proto):
733 def clonebundles(repo, proto):
735 """Server command for returning info for available bundles to seed clones.
734 """Server command for returning info for available bundles to seed clones.
736
735
737 Clients will parse this response and determine what bundle to fetch.
736 Clients will parse this response and determine what bundle to fetch.
738
737
739 Extensions may wrap this command to filter or dynamically emit data
738 Extensions may wrap this command to filter or dynamically emit data
740 depending on the request. e.g. you could advertise URLs for the closest
739 depending on the request. e.g. you could advertise URLs for the closest
741 data center given the client's IP address.
740 data center given the client's IP address.
742 """
741 """
743 return repo.vfs.tryread('clonebundles.manifest')
742 return repo.vfs.tryread('clonebundles.manifest')
744
743
745 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
744 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
746 'known', 'getbundle', 'unbundlehash', 'batch']
745 'known', 'getbundle', 'unbundlehash', 'batch']
747
746
748 def _capabilities(repo, proto):
747 def _capabilities(repo, proto):
749 """return a list of capabilities for a repo
748 """return a list of capabilities for a repo
750
749
751 This function exists to allow extensions to easily wrap capabilities
750 This function exists to allow extensions to easily wrap capabilities
752 computation
751 computation
753
752
754 - returns a lists: easy to alter
753 - returns a lists: easy to alter
755 - change done here will be propagated to both `capabilities` and `hello`
754 - change done here will be propagated to both `capabilities` and `hello`
756 command without any other action needed.
755 command without any other action needed.
757 """
756 """
758 # copy to prevent modification of the global list
757 # copy to prevent modification of the global list
759 caps = list(wireprotocaps)
758 caps = list(wireprotocaps)
760 if streamclone.allowservergeneration(repo):
759 if streamclone.allowservergeneration(repo):
761 if repo.ui.configbool('server', 'preferuncompressed'):
760 if repo.ui.configbool('server', 'preferuncompressed'):
762 caps.append('stream-preferred')
761 caps.append('stream-preferred')
763 requiredformats = repo.requirements & repo.supportedformats
762 requiredformats = repo.requirements & repo.supportedformats
764 # if our local revlogs are just revlogv1, add 'stream' cap
763 # if our local revlogs are just revlogv1, add 'stream' cap
765 if not requiredformats - {'revlogv1'}:
764 if not requiredformats - {'revlogv1'}:
766 caps.append('stream')
765 caps.append('stream')
767 # otherwise, add 'streamreqs' detailing our local revlog format
766 # otherwise, add 'streamreqs' detailing our local revlog format
768 else:
767 else:
769 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
768 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
770 if repo.ui.configbool('experimental', 'bundle2-advertise'):
769 if repo.ui.configbool('experimental', 'bundle2-advertise'):
771 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
770 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
772 caps.append('bundle2=' + urlreq.quote(capsblob))
771 caps.append('bundle2=' + urlreq.quote(capsblob))
773 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
772 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
774
773
775 if proto.name == 'http':
774 if proto.name == 'http':
776 caps.append('httpheader=%d' %
775 caps.append('httpheader=%d' %
777 repo.ui.configint('server', 'maxhttpheaderlen'))
776 repo.ui.configint('server', 'maxhttpheaderlen'))
778 if repo.ui.configbool('experimental', 'httppostargs'):
777 if repo.ui.configbool('experimental', 'httppostargs'):
779 caps.append('httppostargs')
778 caps.append('httppostargs')
780
779
781 # FUTURE advertise 0.2rx once support is implemented
780 # FUTURE advertise 0.2rx once support is implemented
782 # FUTURE advertise minrx and mintx after consulting config option
781 # FUTURE advertise minrx and mintx after consulting config option
783 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
782 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
784
783
785 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
784 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
786 if compengines:
785 if compengines:
787 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
786 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
788 for e in compengines)
787 for e in compengines)
789 caps.append('compression=%s' % comptypes)
788 caps.append('compression=%s' % comptypes)
790
789
791 return caps
790 return caps
792
791
793 # If you are writing an extension and consider wrapping this function. Wrap
792 # If you are writing an extension and consider wrapping this function. Wrap
794 # `_capabilities` instead.
793 # `_capabilities` instead.
795 @wireprotocommand('capabilities')
794 @wireprotocommand('capabilities')
796 def capabilities(repo, proto):
795 def capabilities(repo, proto):
797 return ' '.join(_capabilities(repo, proto))
796 return ' '.join(_capabilities(repo, proto))
798
797
799 @wireprotocommand('changegroup', 'roots')
798 @wireprotocommand('changegroup', 'roots')
800 def changegroup(repo, proto, roots):
799 def changegroup(repo, proto, roots):
801 nodes = decodelist(roots)
800 nodes = decodelist(roots)
802 outgoing = discovery.outgoing(repo, missingroots=nodes,
801 outgoing = discovery.outgoing(repo, missingroots=nodes,
803 missingheads=repo.heads())
802 missingheads=repo.heads())
804 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
803 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
805 return streamres(reader=cg, v1compressible=True)
804 gen = iter(lambda: cg.read(32768), '')
805 return streamres(gen=gen, v1compressible=True)
806
806
807 @wireprotocommand('changegroupsubset', 'bases heads')
807 @wireprotocommand('changegroupsubset', 'bases heads')
808 def changegroupsubset(repo, proto, bases, heads):
808 def changegroupsubset(repo, proto, bases, heads):
809 bases = decodelist(bases)
809 bases = decodelist(bases)
810 heads = decodelist(heads)
810 heads = decodelist(heads)
811 outgoing = discovery.outgoing(repo, missingroots=bases,
811 outgoing = discovery.outgoing(repo, missingroots=bases,
812 missingheads=heads)
812 missingheads=heads)
813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 return streamres(reader=cg, v1compressible=True)
814 gen = iter(lambda: cg.read(32768), '')
815 return streamres(gen=gen, v1compressible=True)
815
816
816 @wireprotocommand('debugwireargs', 'one two *')
817 @wireprotocommand('debugwireargs', 'one two *')
817 def debugwireargs(repo, proto, one, two, others):
818 def debugwireargs(repo, proto, one, two, others):
818 # only accept optional args from the known set
819 # only accept optional args from the known set
819 opts = options('debugwireargs', ['three', 'four'], others)
820 opts = options('debugwireargs', ['three', 'four'], others)
820 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
821 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
821
822
822 @wireprotocommand('getbundle', '*')
823 @wireprotocommand('getbundle', '*')
823 def getbundle(repo, proto, others):
824 def getbundle(repo, proto, others):
824 opts = options('getbundle', gboptsmap.keys(), others)
825 opts = options('getbundle', gboptsmap.keys(), others)
825 for k, v in opts.iteritems():
826 for k, v in opts.iteritems():
826 keytype = gboptsmap[k]
827 keytype = gboptsmap[k]
827 if keytype == 'nodes':
828 if keytype == 'nodes':
828 opts[k] = decodelist(v)
829 opts[k] = decodelist(v)
829 elif keytype == 'csv':
830 elif keytype == 'csv':
830 opts[k] = list(v.split(','))
831 opts[k] = list(v.split(','))
831 elif keytype == 'scsv':
832 elif keytype == 'scsv':
832 opts[k] = set(v.split(','))
833 opts[k] = set(v.split(','))
833 elif keytype == 'boolean':
834 elif keytype == 'boolean':
834 # Client should serialize False as '0', which is a non-empty string
835 # Client should serialize False as '0', which is a non-empty string
835 # so it evaluates as a True bool.
836 # so it evaluates as a True bool.
836 if v == '0':
837 if v == '0':
837 opts[k] = False
838 opts[k] = False
838 else:
839 else:
839 opts[k] = bool(v)
840 opts[k] = bool(v)
840 elif keytype != 'plain':
841 elif keytype != 'plain':
841 raise KeyError('unknown getbundle option type %s'
842 raise KeyError('unknown getbundle option type %s'
842 % keytype)
843 % keytype)
843
844
844 if not bundle1allowed(repo, 'pull'):
845 if not bundle1allowed(repo, 'pull'):
845 if not exchange.bundle2requested(opts.get('bundlecaps')):
846 if not exchange.bundle2requested(opts.get('bundlecaps')):
846 if proto.name == 'http':
847 if proto.name == 'http':
847 return ooberror(bundle2required)
848 return ooberror(bundle2required)
848 raise error.Abort(bundle2requiredmain,
849 raise error.Abort(bundle2requiredmain,
849 hint=bundle2requiredhint)
850 hint=bundle2requiredhint)
850
851
851 try:
852 try:
852 if repo.ui.configbool('server', 'disablefullbundle'):
853 if repo.ui.configbool('server', 'disablefullbundle'):
853 # Check to see if this is a full clone.
854 # Check to see if this is a full clone.
854 clheads = set(repo.changelog.heads())
855 clheads = set(repo.changelog.heads())
855 heads = set(opts.get('heads', set()))
856 heads = set(opts.get('heads', set()))
856 common = set(opts.get('common', set()))
857 common = set(opts.get('common', set()))
857 common.discard(nullid)
858 common.discard(nullid)
858 if not common and clheads == heads:
859 if not common and clheads == heads:
859 raise error.Abort(
860 raise error.Abort(
860 _('server has pull-based clones disabled'),
861 _('server has pull-based clones disabled'),
861 hint=_('remove --pull if specified or upgrade Mercurial'))
862 hint=_('remove --pull if specified or upgrade Mercurial'))
862
863
863 chunks = exchange.getbundlechunks(repo, 'serve',
864 chunks = exchange.getbundlechunks(repo, 'serve',
864 **pycompat.strkwargs(opts))
865 **pycompat.strkwargs(opts))
865 except error.Abort as exc:
866 except error.Abort as exc:
866 # cleanly forward Abort error to the client
867 # cleanly forward Abort error to the client
867 if not exchange.bundle2requested(opts.get('bundlecaps')):
868 if not exchange.bundle2requested(opts.get('bundlecaps')):
868 if proto.name == 'http':
869 if proto.name == 'http':
869 return ooberror(str(exc) + '\n')
870 return ooberror(str(exc) + '\n')
870 raise # cannot do better for bundle1 + ssh
871 raise # cannot do better for bundle1 + ssh
871 # bundle2 request expect a bundle2 reply
872 # bundle2 request expect a bundle2 reply
872 bundler = bundle2.bundle20(repo.ui)
873 bundler = bundle2.bundle20(repo.ui)
873 manargs = [('message', str(exc))]
874 manargs = [('message', str(exc))]
874 advargs = []
875 advargs = []
875 if exc.hint is not None:
876 if exc.hint is not None:
876 advargs.append(('hint', exc.hint))
877 advargs.append(('hint', exc.hint))
877 bundler.addpart(bundle2.bundlepart('error:abort',
878 bundler.addpart(bundle2.bundlepart('error:abort',
878 manargs, advargs))
879 manargs, advargs))
879 return streamres(gen=bundler.getchunks(), v1compressible=True)
880 return streamres(gen=bundler.getchunks(), v1compressible=True)
880 return streamres(gen=chunks, v1compressible=True)
881 return streamres(gen=chunks, v1compressible=True)
881
882
882 @wireprotocommand('heads')
883 @wireprotocommand('heads')
883 def heads(repo, proto):
884 def heads(repo, proto):
884 h = repo.heads()
885 h = repo.heads()
885 return encodelist(h) + "\n"
886 return encodelist(h) + "\n"
886
887
887 @wireprotocommand('hello')
888 @wireprotocommand('hello')
888 def hello(repo, proto):
889 def hello(repo, proto):
889 '''the hello command returns a set of lines describing various
890 '''the hello command returns a set of lines describing various
890 interesting things about the server, in an RFC822-like format.
891 interesting things about the server, in an RFC822-like format.
891 Currently the only one defined is "capabilities", which
892 Currently the only one defined is "capabilities", which
892 consists of a line in the form:
893 consists of a line in the form:
893
894
894 capabilities: space separated list of tokens
895 capabilities: space separated list of tokens
895 '''
896 '''
896 return "capabilities: %s\n" % (capabilities(repo, proto))
897 return "capabilities: %s\n" % (capabilities(repo, proto))
897
898
898 @wireprotocommand('listkeys', 'namespace')
899 @wireprotocommand('listkeys', 'namespace')
899 def listkeys(repo, proto, namespace):
900 def listkeys(repo, proto, namespace):
900 d = repo.listkeys(encoding.tolocal(namespace)).items()
901 d = repo.listkeys(encoding.tolocal(namespace)).items()
901 return pushkeymod.encodekeys(d)
902 return pushkeymod.encodekeys(d)
902
903
903 @wireprotocommand('lookup', 'key')
904 @wireprotocommand('lookup', 'key')
904 def lookup(repo, proto, key):
905 def lookup(repo, proto, key):
905 try:
906 try:
906 k = encoding.tolocal(key)
907 k = encoding.tolocal(key)
907 c = repo[k]
908 c = repo[k]
908 r = c.hex()
909 r = c.hex()
909 success = 1
910 success = 1
910 except Exception as inst:
911 except Exception as inst:
911 r = str(inst)
912 r = str(inst)
912 success = 0
913 success = 0
913 return "%d %s\n" % (success, r)
914 return "%d %s\n" % (success, r)
914
915
915 @wireprotocommand('known', 'nodes *')
916 @wireprotocommand('known', 'nodes *')
916 def known(repo, proto, nodes, others):
917 def known(repo, proto, nodes, others):
917 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
918 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
918
919
919 @wireprotocommand('pushkey', 'namespace key old new')
920 @wireprotocommand('pushkey', 'namespace key old new')
920 def pushkey(repo, proto, namespace, key, old, new):
921 def pushkey(repo, proto, namespace, key, old, new):
921 # compatibility with pre-1.8 clients which were accidentally
922 # compatibility with pre-1.8 clients which were accidentally
922 # sending raw binary nodes rather than utf-8-encoded hex
923 # sending raw binary nodes rather than utf-8-encoded hex
923 if len(new) == 20 and util.escapestr(new) != new:
924 if len(new) == 20 and util.escapestr(new) != new:
924 # looks like it could be a binary node
925 # looks like it could be a binary node
925 try:
926 try:
926 new.decode('utf-8')
927 new.decode('utf-8')
927 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
928 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
928 except UnicodeDecodeError:
929 except UnicodeDecodeError:
929 pass # binary, leave unmodified
930 pass # binary, leave unmodified
930 else:
931 else:
931 new = encoding.tolocal(new) # normal path
932 new = encoding.tolocal(new) # normal path
932
933
933 if util.safehasattr(proto, 'restore'):
934 if util.safehasattr(proto, 'restore'):
934
935
935 proto.redirect()
936 proto.redirect()
936
937
937 try:
938 try:
938 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
939 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
939 encoding.tolocal(old), new) or False
940 encoding.tolocal(old), new) or False
940 except error.Abort:
941 except error.Abort:
941 r = False
942 r = False
942
943
943 output = proto.restore()
944 output = proto.restore()
944
945
945 return '%s\n%s' % (int(r), output)
946 return '%s\n%s' % (int(r), output)
946
947
947 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
948 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
948 encoding.tolocal(old), new)
949 encoding.tolocal(old), new)
949 return '%s\n' % int(r)
950 return '%s\n' % int(r)
950
951
951 @wireprotocommand('stream_out')
952 @wireprotocommand('stream_out')
952 def stream(repo, proto):
953 def stream(repo, proto):
953 '''If the server supports streaming clone, it advertises the "stream"
954 '''If the server supports streaming clone, it advertises the "stream"
954 capability with a value representing the version and flags of the repo
955 capability with a value representing the version and flags of the repo
955 it is serving. Client checks to see if it understands the format.
956 it is serving. Client checks to see if it understands the format.
956 '''
957 '''
957 return streamres(streamclone.generatev1wireproto(repo))
958 return streamres(streamclone.generatev1wireproto(repo))
958
959
959 @wireprotocommand('unbundle', 'heads')
960 @wireprotocommand('unbundle', 'heads')
960 def unbundle(repo, proto, heads):
961 def unbundle(repo, proto, heads):
961 their_heads = decodelist(heads)
962 their_heads = decodelist(heads)
962
963
963 try:
964 try:
964 proto.redirect()
965 proto.redirect()
965
966
966 exchange.check_heads(repo, their_heads, 'preparing changes')
967 exchange.check_heads(repo, their_heads, 'preparing changes')
967
968
968 # write bundle data to temporary file because it can be big
969 # write bundle data to temporary file because it can be big
969 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
970 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
970 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
971 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
971 r = 0
972 r = 0
972 try:
973 try:
973 proto.getfile(fp)
974 proto.getfile(fp)
974 fp.seek(0)
975 fp.seek(0)
975 gen = exchange.readbundle(repo.ui, fp, None)
976 gen = exchange.readbundle(repo.ui, fp, None)
976 if (isinstance(gen, changegroupmod.cg1unpacker)
977 if (isinstance(gen, changegroupmod.cg1unpacker)
977 and not bundle1allowed(repo, 'push')):
978 and not bundle1allowed(repo, 'push')):
978 if proto.name == 'http':
979 if proto.name == 'http':
979 # need to special case http because stderr do not get to
980 # need to special case http because stderr do not get to
980 # the http client on failed push so we need to abuse some
981 # the http client on failed push so we need to abuse some
981 # other error type to make sure the message get to the
982 # other error type to make sure the message get to the
982 # user.
983 # user.
983 return ooberror(bundle2required)
984 return ooberror(bundle2required)
984 raise error.Abort(bundle2requiredmain,
985 raise error.Abort(bundle2requiredmain,
985 hint=bundle2requiredhint)
986 hint=bundle2requiredhint)
986
987
987 r = exchange.unbundle(repo, gen, their_heads, 'serve',
988 r = exchange.unbundle(repo, gen, their_heads, 'serve',
988 proto._client())
989 proto._client())
989 if util.safehasattr(r, 'addpart'):
990 if util.safehasattr(r, 'addpart'):
990 # The return looks streamable, we are in the bundle2 case and
991 # The return looks streamable, we are in the bundle2 case and
991 # should return a stream.
992 # should return a stream.
992 return streamres(gen=r.getchunks())
993 return streamres(gen=r.getchunks())
993 return pushres(r)
994 return pushres(r)
994
995
995 finally:
996 finally:
996 fp.close()
997 fp.close()
997 os.unlink(tempname)
998 os.unlink(tempname)
998
999
999 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1000 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1000 # handle non-bundle2 case first
1001 # handle non-bundle2 case first
1001 if not getattr(exc, 'duringunbundle2', False):
1002 if not getattr(exc, 'duringunbundle2', False):
1002 try:
1003 try:
1003 raise
1004 raise
1004 except error.Abort:
1005 except error.Abort:
1005 # The old code we moved used util.stderr directly.
1006 # The old code we moved used util.stderr directly.
1006 # We did not change it to minimise code change.
1007 # We did not change it to minimise code change.
1007 # This need to be moved to something proper.
1008 # This need to be moved to something proper.
1008 # Feel free to do it.
1009 # Feel free to do it.
1009 util.stderr.write("abort: %s\n" % exc)
1010 util.stderr.write("abort: %s\n" % exc)
1010 if exc.hint is not None:
1011 if exc.hint is not None:
1011 util.stderr.write("(%s)\n" % exc.hint)
1012 util.stderr.write("(%s)\n" % exc.hint)
1012 return pushres(0)
1013 return pushres(0)
1013 except error.PushRaced:
1014 except error.PushRaced:
1014 return pusherr(str(exc))
1015 return pusherr(str(exc))
1015
1016
1016 bundler = bundle2.bundle20(repo.ui)
1017 bundler = bundle2.bundle20(repo.ui)
1017 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1018 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1018 bundler.addpart(out)
1019 bundler.addpart(out)
1019 try:
1020 try:
1020 try:
1021 try:
1021 raise
1022 raise
1022 except error.PushkeyFailed as exc:
1023 except error.PushkeyFailed as exc:
1023 # check client caps
1024 # check client caps
1024 remotecaps = getattr(exc, '_replycaps', None)
1025 remotecaps = getattr(exc, '_replycaps', None)
1025 if (remotecaps is not None
1026 if (remotecaps is not None
1026 and 'pushkey' not in remotecaps.get('error', ())):
1027 and 'pushkey' not in remotecaps.get('error', ())):
1027 # no support remote side, fallback to Abort handler.
1028 # no support remote side, fallback to Abort handler.
1028 raise
1029 raise
1029 part = bundler.newpart('error:pushkey')
1030 part = bundler.newpart('error:pushkey')
1030 part.addparam('in-reply-to', exc.partid)
1031 part.addparam('in-reply-to', exc.partid)
1031 if exc.namespace is not None:
1032 if exc.namespace is not None:
1032 part.addparam('namespace', exc.namespace, mandatory=False)
1033 part.addparam('namespace', exc.namespace, mandatory=False)
1033 if exc.key is not None:
1034 if exc.key is not None:
1034 part.addparam('key', exc.key, mandatory=False)
1035 part.addparam('key', exc.key, mandatory=False)
1035 if exc.new is not None:
1036 if exc.new is not None:
1036 part.addparam('new', exc.new, mandatory=False)
1037 part.addparam('new', exc.new, mandatory=False)
1037 if exc.old is not None:
1038 if exc.old is not None:
1038 part.addparam('old', exc.old, mandatory=False)
1039 part.addparam('old', exc.old, mandatory=False)
1039 if exc.ret is not None:
1040 if exc.ret is not None:
1040 part.addparam('ret', exc.ret, mandatory=False)
1041 part.addparam('ret', exc.ret, mandatory=False)
1041 except error.BundleValueError as exc:
1042 except error.BundleValueError as exc:
1042 errpart = bundler.newpart('error:unsupportedcontent')
1043 errpart = bundler.newpart('error:unsupportedcontent')
1043 if exc.parttype is not None:
1044 if exc.parttype is not None:
1044 errpart.addparam('parttype', exc.parttype)
1045 errpart.addparam('parttype', exc.parttype)
1045 if exc.params:
1046 if exc.params:
1046 errpart.addparam('params', '\0'.join(exc.params))
1047 errpart.addparam('params', '\0'.join(exc.params))
1047 except error.Abort as exc:
1048 except error.Abort as exc:
1048 manargs = [('message', str(exc))]
1049 manargs = [('message', str(exc))]
1049 advargs = []
1050 advargs = []
1050 if exc.hint is not None:
1051 if exc.hint is not None:
1051 advargs.append(('hint', exc.hint))
1052 advargs.append(('hint', exc.hint))
1052 bundler.addpart(bundle2.bundlepart('error:abort',
1053 bundler.addpart(bundle2.bundlepart('error:abort',
1053 manargs, advargs))
1054 manargs, advargs))
1054 except error.PushRaced as exc:
1055 except error.PushRaced as exc:
1055 bundler.newpart('error:pushraced', [('message', str(exc))])
1056 bundler.newpart('error:pushraced', [('message', str(exc))])
1056 return streamres(gen=bundler.getchunks())
1057 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now