##// END OF EJS Templates
wireproto: drop support for reader interface from streamres (API)...
Gregory Szorc -
r35723:8cdb671d default
parent child Browse files
Show More
@@ -1,210 +1,207
1 1 #
2 2 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import cgi
11 11 import struct
12 12
13 13 from .common import (
14 14 HTTP_OK,
15 15 )
16 16
17 17 from .. import (
18 18 error,
19 19 pycompat,
20 20 util,
21 21 wireproto,
22 22 )
23 23 stringio = util.stringio
24 24
25 25 urlerr = util.urlerr
26 26 urlreq = util.urlreq
27 27
28 28 HGTYPE = 'application/mercurial-0.1'
29 29 HGTYPE2 = 'application/mercurial-0.2'
30 30 HGERRTYPE = 'application/hg-error'
31 31
32 32 def decodevaluefromheaders(req, headerprefix):
33 33 """Decode a long value from multiple HTTP request headers.
34 34
35 35 Returns the value as a bytes, not a str.
36 36 """
37 37 chunks = []
38 38 i = 1
39 39 prefix = headerprefix.upper().replace(r'-', r'_')
40 40 while True:
41 41 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
42 42 if v is None:
43 43 break
44 44 chunks.append(pycompat.bytesurl(v))
45 45 i += 1
46 46
47 47 return ''.join(chunks)
48 48
49 49 class webproto(wireproto.abstractserverproto):
50 50 def __init__(self, req, ui):
51 51 self.req = req
52 52 self.response = ''
53 53 self.ui = ui
54 54 self.name = 'http'
55 55
56 56 def getargs(self, args):
57 57 knownargs = self._args()
58 58 data = {}
59 59 keys = args.split()
60 60 for k in keys:
61 61 if k == '*':
62 62 star = {}
63 63 for key in knownargs.keys():
64 64 if key != 'cmd' and key not in keys:
65 65 star[key] = knownargs[key][0]
66 66 data['*'] = star
67 67 else:
68 68 data[k] = knownargs[k][0]
69 69 return [data[k] for k in keys]
70 70 def _args(self):
71 71 args = self.req.form.copy()
72 72 if pycompat.ispy3:
73 73 args = {k.encode('ascii'): [v.encode('ascii') for v in vs]
74 74 for k, vs in args.items()}
75 75 postlen = int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
76 76 if postlen:
77 77 args.update(cgi.parse_qs(
78 78 self.req.read(postlen), keep_blank_values=True))
79 79 return args
80 80
81 81 argvalue = decodevaluefromheaders(self.req, r'X-HgArg')
82 82 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
83 83 return args
84 84 def getfile(self, fp):
85 85 length = int(self.req.env[r'CONTENT_LENGTH'])
86 86 # If httppostargs is used, we need to read Content-Length
87 87 # minus the amount that was consumed by args.
88 88 length -= int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
89 89 for s in util.filechunkiter(self.req, limit=length):
90 90 fp.write(s)
91 91 def redirect(self):
92 92 self.oldio = self.ui.fout, self.ui.ferr
93 93 self.ui.ferr = self.ui.fout = stringio()
94 94 def restore(self):
95 95 val = self.ui.fout.getvalue()
96 96 self.ui.ferr, self.ui.fout = self.oldio
97 97 return val
98 98
99 99 def _client(self):
100 100 return 'remote:%s:%s:%s' % (
101 101 self.req.env.get('wsgi.url_scheme') or 'http',
102 102 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
103 103 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
104 104
105 105 def responsetype(self, v1compressible=False):
106 106 """Determine the appropriate response type and compression settings.
107 107
108 108 The ``v1compressible`` argument states whether the response with
109 109 application/mercurial-0.1 media types should be zlib compressed.
110 110
111 111 Returns a tuple of (mediatype, compengine, engineopts).
112 112 """
113 113 # For now, if it isn't compressible in the old world, it's never
114 114 # compressible. We can change this to send uncompressed 0.2 payloads
115 115 # later.
116 116 if not v1compressible:
117 117 return HGTYPE, None, None
118 118
119 119 # Determine the response media type and compression engine based
120 120 # on the request parameters.
121 121 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
122 122
123 123 if '0.2' in protocaps:
124 124 # Default as defined by wire protocol spec.
125 125 compformats = ['zlib', 'none']
126 126 for cap in protocaps:
127 127 if cap.startswith('comp='):
128 128 compformats = cap[5:].split(',')
129 129 break
130 130
131 131 # Now find an agreed upon compression format.
132 132 for engine in wireproto.supportedcompengines(self.ui, self,
133 133 util.SERVERROLE):
134 134 if engine.wireprotosupport().name in compformats:
135 135 opts = {}
136 136 level = self.ui.configint('server',
137 137 '%slevel' % engine.name())
138 138 if level is not None:
139 139 opts['level'] = level
140 140
141 141 return HGTYPE2, engine, opts
142 142
143 143 # No mutually supported compression format. Fall back to the
144 144 # legacy protocol.
145 145
146 146 # Don't allow untrusted settings because disabling compression or
147 147 # setting a very high compression level could lead to flooding
148 148 # the server's network or CPU.
149 149 opts = {'level': self.ui.configint('server', 'zliblevel')}
150 150 return HGTYPE, util.compengines['zlib'], opts
151 151
152 152 def iscmd(cmd):
153 153 return cmd in wireproto.commands
154 154
155 155 def call(repo, req, cmd):
156 156 p = webproto(req, repo.ui)
157 157
158 158 def genversion2(gen, compress, engine, engineopts):
159 159 # application/mercurial-0.2 always sends a payload header
160 160 # identifying the compression engine.
161 161 name = engine.wireprotosupport().name
162 162 assert 0 < len(name) < 256
163 163 yield struct.pack('B', len(name))
164 164 yield name
165 165
166 166 if compress:
167 167 for chunk in engine.compressstream(gen, opts=engineopts):
168 168 yield chunk
169 169 else:
170 170 for chunk in gen:
171 171 yield chunk
172 172
173 173 rsp = wireproto.dispatch(repo, p, cmd)
174 174 if isinstance(rsp, bytes):
175 175 req.respond(HTTP_OK, HGTYPE, body=rsp)
176 176 return []
177 177 elif isinstance(rsp, wireproto.streamres):
178 if rsp.reader:
179 gen = iter(lambda: rsp.reader.read(32768), '')
180 else:
181 178 gen = rsp.gen
182 179
183 180 # This code for compression should not be streamres specific. It
184 181 # is here because we only compress streamres at the moment.
185 182 mediatype, engine, engineopts = p.responsetype(rsp.v1compressible)
186 183
187 184 if mediatype == HGTYPE and rsp.v1compressible:
188 185 gen = engine.compressstream(gen, engineopts)
189 186 elif mediatype == HGTYPE2:
190 187 gen = genversion2(gen, rsp.v1compressible, engine, engineopts)
191 188
192 189 req.respond(HTTP_OK, mediatype)
193 190 return gen
194 191 elif isinstance(rsp, wireproto.pushres):
195 192 val = p.restore()
196 193 rsp = '%d\n%s' % (rsp.res, val)
197 194 req.respond(HTTP_OK, HGTYPE, body=rsp)
198 195 return []
199 196 elif isinstance(rsp, wireproto.pusherr):
200 197 # drain the incoming bundle
201 198 req.drain()
202 199 p.restore()
203 200 rsp = '0\n%s\n' % rsp.res
204 201 req.respond(HTTP_OK, HGTYPE, body=rsp)
205 202 return []
206 203 elif isinstance(rsp, wireproto.ooberror):
207 204 rsp = rsp.message
208 205 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
209 206 return []
210 207 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
@@ -1,136 +1,130
1 1 # sshserver.py - ssh protocol server support for mercurial
2 2 #
3 3 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
4 4 # Copyright 2006 Vadim Gelfer <vadim.gelfer@gmail.com>
5 5 #
6 6 # This software may be used and distributed according to the terms of the
7 7 # GNU General Public License version 2 or any later version.
8 8
9 9 from __future__ import absolute_import
10 10
11 11 import sys
12 12
13 13 from .i18n import _
14 14 from . import (
15 15 encoding,
16 16 error,
17 17 hook,
18 18 util,
19 19 wireproto,
20 20 )
21 21
22 22 class sshserver(wireproto.abstractserverproto):
23 23 def __init__(self, ui, repo):
24 24 self.ui = ui
25 25 self.repo = repo
26 26 self.lock = None
27 27 self.fin = ui.fin
28 28 self.fout = ui.fout
29 29 self.name = 'ssh'
30 30
31 31 hook.redirect(True)
32 32 ui.fout = repo.ui.fout = ui.ferr
33 33
34 34 # Prevent insertion/deletion of CRs
35 35 util.setbinary(self.fin)
36 36 util.setbinary(self.fout)
37 37
38 38 def getargs(self, args):
39 39 data = {}
40 40 keys = args.split()
41 41 for n in xrange(len(keys)):
42 42 argline = self.fin.readline()[:-1]
43 43 arg, l = argline.split()
44 44 if arg not in keys:
45 45 raise error.Abort(_("unexpected parameter %r") % arg)
46 46 if arg == '*':
47 47 star = {}
48 48 for k in xrange(int(l)):
49 49 argline = self.fin.readline()[:-1]
50 50 arg, l = argline.split()
51 51 val = self.fin.read(int(l))
52 52 star[arg] = val
53 53 data['*'] = star
54 54 else:
55 55 val = self.fin.read(int(l))
56 56 data[arg] = val
57 57 return [data[k] for k in keys]
58 58
59 59 def getarg(self, name):
60 60 return self.getargs(name)[0]
61 61
62 62 def getfile(self, fpout):
63 63 self.sendresponse('')
64 64 count = int(self.fin.readline())
65 65 while count:
66 66 fpout.write(self.fin.read(count))
67 67 count = int(self.fin.readline())
68 68
69 69 def redirect(self):
70 70 pass
71 71
72 72 def sendresponse(self, v):
73 73 self.fout.write("%d\n" % len(v))
74 74 self.fout.write(v)
75 75 self.fout.flush()
76 76
77 77 def sendstream(self, source):
78 78 write = self.fout.write
79
80 if source.reader:
81 gen = iter(lambda: source.reader.read(4096), '')
82 else:
83 gen = source.gen
84
85 for chunk in gen:
79 for chunk in source.gen:
86 80 write(chunk)
87 81 self.fout.flush()
88 82
89 83 def sendpushresponse(self, rsp):
90 84 self.sendresponse('')
91 85 self.sendresponse(str(rsp.res))
92 86
93 87 def sendpusherror(self, rsp):
94 88 self.sendresponse(rsp.res)
95 89
96 90 def sendooberror(self, rsp):
97 91 self.ui.ferr.write('%s\n-\n' % rsp.message)
98 92 self.ui.ferr.flush()
99 93 self.fout.write('\n')
100 94 self.fout.flush()
101 95
102 96 def serve_forever(self):
103 97 try:
104 98 while self.serve_one():
105 99 pass
106 100 finally:
107 101 if self.lock is not None:
108 102 self.lock.release()
109 103 sys.exit(0)
110 104
111 105 handlers = {
112 106 str: sendresponse,
113 107 wireproto.streamres: sendstream,
114 108 wireproto.pushres: sendpushresponse,
115 109 wireproto.pusherr: sendpusherror,
116 110 wireproto.ooberror: sendooberror,
117 111 }
118 112
119 113 def serve_one(self):
120 114 cmd = self.fin.readline()[:-1]
121 115 if cmd and cmd in wireproto.commands:
122 116 rsp = wireproto.dispatch(self.repo, self, cmd)
123 117 self.handlers[rsp.__class__](self, rsp)
124 118 elif cmd:
125 119 impl = getattr(self, 'do_' + cmd, None)
126 120 if impl:
127 121 r = impl()
128 122 if r is not None:
129 123 self.sendresponse(r)
130 124 else:
131 125 self.sendresponse("")
132 126 return cmd != ''
133 127
134 128 def _client(self):
135 129 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
136 130 return 'remote:ssh:' + client
@@ -1,1056 +1,1057
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class abstractserverproto(object):
45 45 """abstract class that summarizes the protocol API
46 46
47 47 Used as reference and documentation.
48 48 """
49 49
50 50 def getargs(self, args):
51 51 """return the value for arguments in <args>
52 52
53 53 returns a list of values (same order as <args>)"""
54 54 raise NotImplementedError()
55 55
56 56 def getfile(self, fp):
57 57 """write the whole content of a file into a file like object
58 58
59 59 The file is in the form::
60 60
61 61 (<chunk-size>\n<chunk>)+0\n
62 62
63 63 chunk size is the ascii version of the int.
64 64 """
65 65 raise NotImplementedError()
66 66
67 67 def redirect(self):
68 68 """may setup interception for stdout and stderr
69 69
70 70 See also the `restore` method."""
71 71 raise NotImplementedError()
72 72
73 73 # If the `redirect` function does install interception, the `restore`
74 74 # function MUST be defined. If interception is not used, this function
75 75 # MUST NOT be defined.
76 76 #
77 77 # left commented here on purpose
78 78 #
79 79 #def restore(self):
80 80 # """reinstall previous stdout and stderr and return intercepted stdout
81 81 # """
82 82 # raise NotImplementedError()
83 83
84 84 class remoteiterbatcher(peer.iterbatcher):
85 85 def __init__(self, remote):
86 86 super(remoteiterbatcher, self).__init__()
87 87 self._remote = remote
88 88
89 89 def __getattr__(self, name):
90 90 # Validate this method is batchable, since submit() only supports
91 91 # batchable methods.
92 92 fn = getattr(self._remote, name)
93 93 if not getattr(fn, 'batchable', None):
94 94 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 95 'call to %r' % name)
96 96
97 97 return super(remoteiterbatcher, self).__getattr__(name)
98 98
99 99 def submit(self):
100 100 """Break the batch request into many patch calls and pipeline them.
101 101
102 102 This is mostly valuable over http where request sizes can be
103 103 limited, but can be used in other places as well.
104 104 """
105 105 # 2-tuple of (command, arguments) that represents what will be
106 106 # sent over the wire.
107 107 requests = []
108 108
109 109 # 4-tuple of (command, final future, @batchable generator, remote
110 110 # future).
111 111 results = []
112 112
113 113 for command, args, opts, finalfuture in self.calls:
114 114 mtd = getattr(self._remote, command)
115 115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116 116
117 117 commandargs, fremote = next(batchable)
118 118 assert fremote
119 119 requests.append((command, commandargs))
120 120 results.append((command, finalfuture, batchable, fremote))
121 121
122 122 if requests:
123 123 self._resultiter = self._remote._submitbatch(requests)
124 124
125 125 self._results = results
126 126
127 127 def results(self):
128 128 for command, finalfuture, batchable, remotefuture in self._results:
129 129 # Get the raw result, set it in the remote future, feed it
130 130 # back into the @batchable generator so it can be decoded, and
131 131 # set the result on the final future to this value.
132 132 remoteresult = next(self._resultiter)
133 133 remotefuture.set(remoteresult)
134 134 finalfuture.set(next(batchable))
135 135
136 136 # Verify our @batchable generators only emit 2 values.
137 137 try:
138 138 next(batchable)
139 139 except StopIteration:
140 140 pass
141 141 else:
142 142 raise error.ProgrammingError('%s @batchable generator emitted '
143 143 'unexpected value count' % command)
144 144
145 145 yield finalfuture.value
146 146
147 147 # Forward a couple of names from peer to make wireproto interactions
148 148 # slightly more sensible.
149 149 batchable = peer.batchable
150 150 future = peer.future
151 151
152 152 # list of nodes encoding / decoding
153 153
154 154 def decodelist(l, sep=' '):
155 155 if l:
156 156 return [bin(v) for v in l.split(sep)]
157 157 return []
158 158
159 159 def encodelist(l, sep=' '):
160 160 try:
161 161 return sep.join(map(hex, l))
162 162 except TypeError:
163 163 raise
164 164
165 165 # batched call argument encoding
166 166
167 167 def escapearg(plain):
168 168 return (plain
169 169 .replace(':', ':c')
170 170 .replace(',', ':o')
171 171 .replace(';', ':s')
172 172 .replace('=', ':e'))
173 173
174 174 def unescapearg(escaped):
175 175 return (escaped
176 176 .replace(':e', '=')
177 177 .replace(':s', ';')
178 178 .replace(':o', ',')
179 179 .replace(':c', ':'))
180 180
181 181 def encodebatchcmds(req):
182 182 """Return a ``cmds`` argument value for the ``batch`` command."""
183 183 cmds = []
184 184 for op, argsdict in req:
185 185 # Old servers didn't properly unescape argument names. So prevent
186 186 # the sending of argument names that may not be decoded properly by
187 187 # servers.
188 188 assert all(escapearg(k) == k for k in argsdict)
189 189
190 190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 191 for k, v in argsdict.iteritems())
192 192 cmds.append('%s %s' % (op, args))
193 193
194 194 return ';'.join(cmds)
195 195
196 196 # mapping of options accepted by getbundle and their types
197 197 #
198 198 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 199 # such options are properly processed in exchange.getbundle.
200 200 #
201 201 # supported types are:
202 202 #
203 203 # :nodes: list of binary nodes
204 204 # :csv: list of comma-separated values
205 205 # :scsv: list of comma-separated values return as set
206 206 # :plain: string with no transformation needed.
207 207 gboptsmap = {'heads': 'nodes',
208 208 'bookmarks': 'boolean',
209 209 'common': 'nodes',
210 210 'obsmarkers': 'boolean',
211 211 'phases': 'boolean',
212 212 'bundlecaps': 'scsv',
213 213 'listkeys': 'csv',
214 214 'cg': 'boolean',
215 215 'cbattempted': 'boolean'}
216 216
217 217 # client side
218 218
219 219 class wirepeer(repository.legacypeer):
220 220 """Client-side interface for communicating with a peer repository.
221 221
222 222 Methods commonly call wire protocol commands of the same name.
223 223
224 224 See also httppeer.py and sshpeer.py for protocol-specific
225 225 implementations of this interface.
226 226 """
227 227 # Begin of basewirepeer interface.
228 228
229 229 def iterbatch(self):
230 230 return remoteiterbatcher(self)
231 231
232 232 @batchable
233 233 def lookup(self, key):
234 234 self.requirecap('lookup', _('look up remote revision'))
235 235 f = future()
236 236 yield {'key': encoding.fromlocal(key)}, f
237 237 d = f.value
238 238 success, data = d[:-1].split(" ", 1)
239 239 if int(success):
240 240 yield bin(data)
241 241 else:
242 242 self._abort(error.RepoError(data))
243 243
244 244 @batchable
245 245 def heads(self):
246 246 f = future()
247 247 yield {}, f
248 248 d = f.value
249 249 try:
250 250 yield decodelist(d[:-1])
251 251 except ValueError:
252 252 self._abort(error.ResponseError(_("unexpected response:"), d))
253 253
254 254 @batchable
255 255 def known(self, nodes):
256 256 f = future()
257 257 yield {'nodes': encodelist(nodes)}, f
258 258 d = f.value
259 259 try:
260 260 yield [bool(int(b)) for b in d]
261 261 except ValueError:
262 262 self._abort(error.ResponseError(_("unexpected response:"), d))
263 263
264 264 @batchable
265 265 def branchmap(self):
266 266 f = future()
267 267 yield {}, f
268 268 d = f.value
269 269 try:
270 270 branchmap = {}
271 271 for branchpart in d.splitlines():
272 272 branchname, branchheads = branchpart.split(' ', 1)
273 273 branchname = encoding.tolocal(urlreq.unquote(branchname))
274 274 branchheads = decodelist(branchheads)
275 275 branchmap[branchname] = branchheads
276 276 yield branchmap
277 277 except TypeError:
278 278 self._abort(error.ResponseError(_("unexpected response:"), d))
279 279
280 280 @batchable
281 281 def listkeys(self, namespace):
282 282 if not self.capable('pushkey'):
283 283 yield {}, None
284 284 f = future()
285 285 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
286 286 yield {'namespace': encoding.fromlocal(namespace)}, f
287 287 d = f.value
288 288 self.ui.debug('received listkey for "%s": %i bytes\n'
289 289 % (namespace, len(d)))
290 290 yield pushkeymod.decodekeys(d)
291 291
292 292 @batchable
293 293 def pushkey(self, namespace, key, old, new):
294 294 if not self.capable('pushkey'):
295 295 yield False, None
296 296 f = future()
297 297 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
298 298 yield {'namespace': encoding.fromlocal(namespace),
299 299 'key': encoding.fromlocal(key),
300 300 'old': encoding.fromlocal(old),
301 301 'new': encoding.fromlocal(new)}, f
302 302 d = f.value
303 303 d, output = d.split('\n', 1)
304 304 try:
305 305 d = bool(int(d))
306 306 except ValueError:
307 307 raise error.ResponseError(
308 308 _('push failed (unexpected response):'), d)
309 309 for l in output.splitlines(True):
310 310 self.ui.status(_('remote: '), l)
311 311 yield d
312 312
313 313 def stream_out(self):
314 314 return self._callstream('stream_out')
315 315
316 316 def getbundle(self, source, **kwargs):
317 317 kwargs = pycompat.byteskwargs(kwargs)
318 318 self.requirecap('getbundle', _('look up remote changes'))
319 319 opts = {}
320 320 bundlecaps = kwargs.get('bundlecaps')
321 321 if bundlecaps is not None:
322 322 kwargs['bundlecaps'] = sorted(bundlecaps)
323 323 else:
324 324 bundlecaps = () # kwargs could have it to None
325 325 for key, value in kwargs.iteritems():
326 326 if value is None:
327 327 continue
328 328 keytype = gboptsmap.get(key)
329 329 if keytype is None:
330 330 raise error.ProgrammingError(
331 331 'Unexpectedly None keytype for key %s' % key)
332 332 elif keytype == 'nodes':
333 333 value = encodelist(value)
334 334 elif keytype in ('csv', 'scsv'):
335 335 value = ','.join(value)
336 336 elif keytype == 'boolean':
337 337 value = '%i' % bool(value)
338 338 elif keytype != 'plain':
339 339 raise KeyError('unknown getbundle option type %s'
340 340 % keytype)
341 341 opts[key] = value
342 342 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
343 343 if any((cap.startswith('HG2') for cap in bundlecaps)):
344 344 return bundle2.getunbundler(self.ui, f)
345 345 else:
346 346 return changegroupmod.cg1unpacker(f, 'UN')
347 347
348 348 def unbundle(self, cg, heads, url):
349 349 '''Send cg (a readable file-like object representing the
350 350 changegroup to push, typically a chunkbuffer object) to the
351 351 remote server as a bundle.
352 352
353 353 When pushing a bundle10 stream, return an integer indicating the
354 354 result of the push (see changegroup.apply()).
355 355
356 356 When pushing a bundle20 stream, return a bundle20 stream.
357 357
358 358 `url` is the url the client thinks it's pushing to, which is
359 359 visible to hooks.
360 360 '''
361 361
362 362 if heads != ['force'] and self.capable('unbundlehash'):
363 363 heads = encodelist(['hashed',
364 364 hashlib.sha1(''.join(sorted(heads))).digest()])
365 365 else:
366 366 heads = encodelist(heads)
367 367
368 368 if util.safehasattr(cg, 'deltaheader'):
369 369 # this a bundle10, do the old style call sequence
370 370 ret, output = self._callpush("unbundle", cg, heads=heads)
371 371 if ret == "":
372 372 raise error.ResponseError(
373 373 _('push failed:'), output)
374 374 try:
375 375 ret = int(ret)
376 376 except ValueError:
377 377 raise error.ResponseError(
378 378 _('push failed (unexpected response):'), ret)
379 379
380 380 for l in output.splitlines(True):
381 381 self.ui.status(_('remote: '), l)
382 382 else:
383 383 # bundle2 push. Send a stream, fetch a stream.
384 384 stream = self._calltwowaystream('unbundle', cg, heads=heads)
385 385 ret = bundle2.getunbundler(self.ui, stream)
386 386 return ret
387 387
388 388 # End of basewirepeer interface.
389 389
390 390 # Begin of baselegacywirepeer interface.
391 391
392 392 def branches(self, nodes):
393 393 n = encodelist(nodes)
394 394 d = self._call("branches", nodes=n)
395 395 try:
396 396 br = [tuple(decodelist(b)) for b in d.splitlines()]
397 397 return br
398 398 except ValueError:
399 399 self._abort(error.ResponseError(_("unexpected response:"), d))
400 400
401 401 def between(self, pairs):
402 402 batch = 8 # avoid giant requests
403 403 r = []
404 404 for i in xrange(0, len(pairs), batch):
405 405 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
406 406 d = self._call("between", pairs=n)
407 407 try:
408 408 r.extend(l and decodelist(l) or [] for l in d.splitlines())
409 409 except ValueError:
410 410 self._abort(error.ResponseError(_("unexpected response:"), d))
411 411 return r
412 412
413 413 def changegroup(self, nodes, kind):
414 414 n = encodelist(nodes)
415 415 f = self._callcompressable("changegroup", roots=n)
416 416 return changegroupmod.cg1unpacker(f, 'UN')
417 417
418 418 def changegroupsubset(self, bases, heads, kind):
419 419 self.requirecap('changegroupsubset', _('look up remote changes'))
420 420 bases = encodelist(bases)
421 421 heads = encodelist(heads)
422 422 f = self._callcompressable("changegroupsubset",
423 423 bases=bases, heads=heads)
424 424 return changegroupmod.cg1unpacker(f, 'UN')
425 425
426 426 # End of baselegacywirepeer interface.
427 427
428 428 def _submitbatch(self, req):
429 429 """run batch request <req> on the server
430 430
431 431 Returns an iterator of the raw responses from the server.
432 432 """
433 433 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
434 434 chunk = rsp.read(1024)
435 435 work = [chunk]
436 436 while chunk:
437 437 while ';' not in chunk and chunk:
438 438 chunk = rsp.read(1024)
439 439 work.append(chunk)
440 440 merged = ''.join(work)
441 441 while ';' in merged:
442 442 one, merged = merged.split(';', 1)
443 443 yield unescapearg(one)
444 444 chunk = rsp.read(1024)
445 445 work = [merged, chunk]
446 446 yield unescapearg(''.join(work))
447 447
448 448 def _submitone(self, op, args):
449 449 return self._call(op, **pycompat.strkwargs(args))
450 450
451 451 def debugwireargs(self, one, two, three=None, four=None, five=None):
452 452 # don't pass optional arguments left at their default value
453 453 opts = {}
454 454 if three is not None:
455 455 opts[r'three'] = three
456 456 if four is not None:
457 457 opts[r'four'] = four
458 458 return self._call('debugwireargs', one=one, two=two, **opts)
459 459
460 460 def _call(self, cmd, **args):
461 461 """execute <cmd> on the server
462 462
463 463 The command is expected to return a simple string.
464 464
465 465 returns the server reply as a string."""
466 466 raise NotImplementedError()
467 467
468 468 def _callstream(self, cmd, **args):
469 469 """execute <cmd> on the server
470 470
471 471 The command is expected to return a stream. Note that if the
472 472 command doesn't return a stream, _callstream behaves
473 473 differently for ssh and http peers.
474 474
475 475 returns the server reply as a file like object.
476 476 """
477 477 raise NotImplementedError()
478 478
479 479 def _callcompressable(self, cmd, **args):
480 480 """execute <cmd> on the server
481 481
482 482 The command is expected to return a stream.
483 483
484 484 The stream may have been compressed in some implementations. This
485 485 function takes care of the decompression. This is the only difference
486 486 with _callstream.
487 487
488 488 returns the server reply as a file like object.
489 489 """
490 490 raise NotImplementedError()
491 491
492 492 def _callpush(self, cmd, fp, **args):
493 493 """execute a <cmd> on server
494 494
495 495 The command is expected to be related to a push. Push has a special
496 496 return method.
497 497
498 498 returns the server reply as a (ret, output) tuple. ret is either
499 499 empty (error) or a stringified int.
500 500 """
501 501 raise NotImplementedError()
502 502
503 503 def _calltwowaystream(self, cmd, fp, **args):
504 504 """execute <cmd> on server
505 505
506 506 The command will send a stream to the server and get a stream in reply.
507 507 """
508 508 raise NotImplementedError()
509 509
510 510 def _abort(self, exception):
511 511 """clearly abort the wire protocol connection and raise the exception
512 512 """
513 513 raise NotImplementedError()
514 514
515 515 # server side
516 516
517 517 # wire protocol command can either return a string or one of these classes.
518 518 class streamres(object):
519 519 """wireproto reply: binary stream
520 520
521 521 The call was successful and the result is a stream.
522 522
523 Accepts either a generator or an object with a ``read(size)`` method.
523 Accepts a generator containing chunks of data to be sent to the client.
524 524
525 525 ``v1compressible`` indicates whether this data can be compressed to
526 526 "version 1" clients (technically: HTTP peers using
527 527 application/mercurial-0.1 media type). This flag should NOT be used on
528 528 new commands because new clients should support a more modern compression
529 529 mechanism.
530 530 """
531 def __init__(self, gen=None, reader=None, v1compressible=False):
531 def __init__(self, gen=None, v1compressible=False):
532 532 self.gen = gen
533 self.reader = reader
534 533 self.v1compressible = v1compressible
535 534
536 535 class pushres(object):
537 536 """wireproto reply: success with simple integer return
538 537
539 538 The call was successful and returned an integer contained in `self.res`.
540 539 """
541 540 def __init__(self, res):
542 541 self.res = res
543 542
544 543 class pusherr(object):
545 544 """wireproto reply: failure
546 545
547 546 The call failed. The `self.res` attribute contains the error message.
548 547 """
549 548 def __init__(self, res):
550 549 self.res = res
551 550
552 551 class ooberror(object):
553 552 """wireproto reply: failure of a batch of operation
554 553
555 554 Something failed during a batch call. The error message is stored in
556 555 `self.message`.
557 556 """
558 557 def __init__(self, message):
559 558 self.message = message
560 559
561 560 def getdispatchrepo(repo, proto, command):
562 561 """Obtain the repo used for processing wire protocol commands.
563 562
564 563 The intent of this function is to serve as a monkeypatch point for
565 564 extensions that need commands to operate on different repo views under
566 565 specialized circumstances.
567 566 """
568 567 return repo.filtered('served')
569 568
570 569 def dispatch(repo, proto, command):
571 570 repo = getdispatchrepo(repo, proto, command)
572 571 func, spec = commands[command]
573 572 args = proto.getargs(spec)
574 573 return func(repo, proto, *args)
575 574
576 575 def options(cmd, keys, others):
577 576 opts = {}
578 577 for k in keys:
579 578 if k in others:
580 579 opts[k] = others[k]
581 580 del others[k]
582 581 if others:
583 582 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
584 583 % (cmd, ",".join(others)))
585 584 return opts
586 585
587 586 def bundle1allowed(repo, action):
588 587 """Whether a bundle1 operation is allowed from the server.
589 588
590 589 Priority is:
591 590
592 591 1. server.bundle1gd.<action> (if generaldelta active)
593 592 2. server.bundle1.<action>
594 593 3. server.bundle1gd (if generaldelta active)
595 594 4. server.bundle1
596 595 """
597 596 ui = repo.ui
598 597 gd = 'generaldelta' in repo.requirements
599 598
600 599 if gd:
601 600 v = ui.configbool('server', 'bundle1gd.%s' % action)
602 601 if v is not None:
603 602 return v
604 603
605 604 v = ui.configbool('server', 'bundle1.%s' % action)
606 605 if v is not None:
607 606 return v
608 607
609 608 if gd:
610 609 v = ui.configbool('server', 'bundle1gd')
611 610 if v is not None:
612 611 return v
613 612
614 613 return ui.configbool('server', 'bundle1')
615 614
616 615 def supportedcompengines(ui, proto, role):
617 616 """Obtain the list of supported compression engines for a request."""
618 617 assert role in (util.CLIENTROLE, util.SERVERROLE)
619 618
620 619 compengines = util.compengines.supportedwireengines(role)
621 620
622 621 # Allow config to override default list and ordering.
623 622 if role == util.SERVERROLE:
624 623 configengines = ui.configlist('server', 'compressionengines')
625 624 config = 'server.compressionengines'
626 625 else:
627 626 # This is currently implemented mainly to facilitate testing. In most
628 627 # cases, the server should be in charge of choosing a compression engine
629 628 # because a server has the most to lose from a sub-optimal choice. (e.g.
630 629 # CPU DoS due to an expensive engine or a network DoS due to poor
631 630 # compression ratio).
632 631 configengines = ui.configlist('experimental',
633 632 'clientcompressionengines')
634 633 config = 'experimental.clientcompressionengines'
635 634
636 635 # No explicit config. Filter out the ones that aren't supposed to be
637 636 # advertised and return default ordering.
638 637 if not configengines:
639 638 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
640 639 return [e for e in compengines
641 640 if getattr(e.wireprotosupport(), attr) > 0]
642 641
643 642 # If compression engines are listed in the config, assume there is a good
644 643 # reason for it (like server operators wanting to achieve specific
645 644 # performance characteristics). So fail fast if the config references
646 645 # unusable compression engines.
647 646 validnames = set(e.name() for e in compengines)
648 647 invalidnames = set(e for e in configengines if e not in validnames)
649 648 if invalidnames:
650 649 raise error.Abort(_('invalid compression engine defined in %s: %s') %
651 650 (config, ', '.join(sorted(invalidnames))))
652 651
653 652 compengines = [e for e in compengines if e.name() in configengines]
654 653 compengines = sorted(compengines,
655 654 key=lambda e: configengines.index(e.name()))
656 655
657 656 if not compengines:
658 657 raise error.Abort(_('%s config option does not specify any known '
659 658 'compression engines') % config,
660 659 hint=_('usable compression engines: %s') %
661 660 ', '.sorted(validnames))
662 661
663 662 return compengines
664 663
665 664 # list of commands
666 665 commands = {}
667 666
668 667 def wireprotocommand(name, args=''):
669 668 """decorator for wire protocol command"""
670 669 def register(func):
671 670 commands[name] = (func, args)
672 671 return func
673 672 return register
674 673
675 674 @wireprotocommand('batch', 'cmds *')
676 675 def batch(repo, proto, cmds, others):
677 676 repo = repo.filtered("served")
678 677 res = []
679 678 for pair in cmds.split(';'):
680 679 op, args = pair.split(' ', 1)
681 680 vals = {}
682 681 for a in args.split(','):
683 682 if a:
684 683 n, v = a.split('=')
685 684 vals[unescapearg(n)] = unescapearg(v)
686 685 func, spec = commands[op]
687 686 if spec:
688 687 keys = spec.split()
689 688 data = {}
690 689 for k in keys:
691 690 if k == '*':
692 691 star = {}
693 692 for key in vals.keys():
694 693 if key not in keys:
695 694 star[key] = vals[key]
696 695 data['*'] = star
697 696 else:
698 697 data[k] = vals[k]
699 698 result = func(repo, proto, *[data[k] for k in keys])
700 699 else:
701 700 result = func(repo, proto)
702 701 if isinstance(result, ooberror):
703 702 return result
704 703 res.append(escapearg(result))
705 704 return ';'.join(res)
706 705
707 706 @wireprotocommand('between', 'pairs')
708 707 def between(repo, proto, pairs):
709 708 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
710 709 r = []
711 710 for b in repo.between(pairs):
712 711 r.append(encodelist(b) + "\n")
713 712 return "".join(r)
714 713
715 714 @wireprotocommand('branchmap')
716 715 def branchmap(repo, proto):
717 716 branchmap = repo.branchmap()
718 717 heads = []
719 718 for branch, nodes in branchmap.iteritems():
720 719 branchname = urlreq.quote(encoding.fromlocal(branch))
721 720 branchnodes = encodelist(nodes)
722 721 heads.append('%s %s' % (branchname, branchnodes))
723 722 return '\n'.join(heads)
724 723
725 724 @wireprotocommand('branches', 'nodes')
726 725 def branches(repo, proto, nodes):
727 726 nodes = decodelist(nodes)
728 727 r = []
729 728 for b in repo.branches(nodes):
730 729 r.append(encodelist(b) + "\n")
731 730 return "".join(r)
732 731
733 732 @wireprotocommand('clonebundles', '')
734 733 def clonebundles(repo, proto):
735 734 """Server command for returning info for available bundles to seed clones.
736 735
737 736 Clients will parse this response and determine what bundle to fetch.
738 737
739 738 Extensions may wrap this command to filter or dynamically emit data
740 739 depending on the request. e.g. you could advertise URLs for the closest
741 740 data center given the client's IP address.
742 741 """
743 742 return repo.vfs.tryread('clonebundles.manifest')
744 743
745 744 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
746 745 'known', 'getbundle', 'unbundlehash', 'batch']
747 746
748 747 def _capabilities(repo, proto):
749 748 """return a list of capabilities for a repo
750 749
751 750 This function exists to allow extensions to easily wrap capabilities
752 751 computation
753 752
754 753 - returns a lists: easy to alter
755 754 - change done here will be propagated to both `capabilities` and `hello`
756 755 command without any other action needed.
757 756 """
758 757 # copy to prevent modification of the global list
759 758 caps = list(wireprotocaps)
760 759 if streamclone.allowservergeneration(repo):
761 760 if repo.ui.configbool('server', 'preferuncompressed'):
762 761 caps.append('stream-preferred')
763 762 requiredformats = repo.requirements & repo.supportedformats
764 763 # if our local revlogs are just revlogv1, add 'stream' cap
765 764 if not requiredformats - {'revlogv1'}:
766 765 caps.append('stream')
767 766 # otherwise, add 'streamreqs' detailing our local revlog format
768 767 else:
769 768 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
770 769 if repo.ui.configbool('experimental', 'bundle2-advertise'):
771 770 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
772 771 caps.append('bundle2=' + urlreq.quote(capsblob))
773 772 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
774 773
775 774 if proto.name == 'http':
776 775 caps.append('httpheader=%d' %
777 776 repo.ui.configint('server', 'maxhttpheaderlen'))
778 777 if repo.ui.configbool('experimental', 'httppostargs'):
779 778 caps.append('httppostargs')
780 779
781 780 # FUTURE advertise 0.2rx once support is implemented
782 781 # FUTURE advertise minrx and mintx after consulting config option
783 782 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
784 783
785 784 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
786 785 if compengines:
787 786 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
788 787 for e in compengines)
789 788 caps.append('compression=%s' % comptypes)
790 789
791 790 return caps
792 791
793 792 # If you are writing an extension and consider wrapping this function. Wrap
794 793 # `_capabilities` instead.
795 794 @wireprotocommand('capabilities')
796 795 def capabilities(repo, proto):
797 796 return ' '.join(_capabilities(repo, proto))
798 797
799 798 @wireprotocommand('changegroup', 'roots')
800 799 def changegroup(repo, proto, roots):
801 800 nodes = decodelist(roots)
802 801 outgoing = discovery.outgoing(repo, missingroots=nodes,
803 802 missingheads=repo.heads())
804 803 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
805 return streamres(reader=cg, v1compressible=True)
804 gen = iter(lambda: cg.read(32768), '')
805 return streamres(gen=gen, v1compressible=True)
806 806
807 807 @wireprotocommand('changegroupsubset', 'bases heads')
808 808 def changegroupsubset(repo, proto, bases, heads):
809 809 bases = decodelist(bases)
810 810 heads = decodelist(heads)
811 811 outgoing = discovery.outgoing(repo, missingroots=bases,
812 812 missingheads=heads)
813 813 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
814 return streamres(reader=cg, v1compressible=True)
814 gen = iter(lambda: cg.read(32768), '')
815 return streamres(gen=gen, v1compressible=True)
815 816
816 817 @wireprotocommand('debugwireargs', 'one two *')
817 818 def debugwireargs(repo, proto, one, two, others):
818 819 # only accept optional args from the known set
819 820 opts = options('debugwireargs', ['three', 'four'], others)
820 821 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
821 822
822 823 @wireprotocommand('getbundle', '*')
823 824 def getbundle(repo, proto, others):
824 825 opts = options('getbundle', gboptsmap.keys(), others)
825 826 for k, v in opts.iteritems():
826 827 keytype = gboptsmap[k]
827 828 if keytype == 'nodes':
828 829 opts[k] = decodelist(v)
829 830 elif keytype == 'csv':
830 831 opts[k] = list(v.split(','))
831 832 elif keytype == 'scsv':
832 833 opts[k] = set(v.split(','))
833 834 elif keytype == 'boolean':
834 835 # Client should serialize False as '0', which is a non-empty string
835 836 # so it evaluates as a True bool.
836 837 if v == '0':
837 838 opts[k] = False
838 839 else:
839 840 opts[k] = bool(v)
840 841 elif keytype != 'plain':
841 842 raise KeyError('unknown getbundle option type %s'
842 843 % keytype)
843 844
844 845 if not bundle1allowed(repo, 'pull'):
845 846 if not exchange.bundle2requested(opts.get('bundlecaps')):
846 847 if proto.name == 'http':
847 848 return ooberror(bundle2required)
848 849 raise error.Abort(bundle2requiredmain,
849 850 hint=bundle2requiredhint)
850 851
851 852 try:
852 853 if repo.ui.configbool('server', 'disablefullbundle'):
853 854 # Check to see if this is a full clone.
854 855 clheads = set(repo.changelog.heads())
855 856 heads = set(opts.get('heads', set()))
856 857 common = set(opts.get('common', set()))
857 858 common.discard(nullid)
858 859 if not common and clheads == heads:
859 860 raise error.Abort(
860 861 _('server has pull-based clones disabled'),
861 862 hint=_('remove --pull if specified or upgrade Mercurial'))
862 863
863 864 chunks = exchange.getbundlechunks(repo, 'serve',
864 865 **pycompat.strkwargs(opts))
865 866 except error.Abort as exc:
866 867 # cleanly forward Abort error to the client
867 868 if not exchange.bundle2requested(opts.get('bundlecaps')):
868 869 if proto.name == 'http':
869 870 return ooberror(str(exc) + '\n')
870 871 raise # cannot do better for bundle1 + ssh
871 872 # bundle2 request expect a bundle2 reply
872 873 bundler = bundle2.bundle20(repo.ui)
873 874 manargs = [('message', str(exc))]
874 875 advargs = []
875 876 if exc.hint is not None:
876 877 advargs.append(('hint', exc.hint))
877 878 bundler.addpart(bundle2.bundlepart('error:abort',
878 879 manargs, advargs))
879 880 return streamres(gen=bundler.getchunks(), v1compressible=True)
880 881 return streamres(gen=chunks, v1compressible=True)
881 882
882 883 @wireprotocommand('heads')
883 884 def heads(repo, proto):
884 885 h = repo.heads()
885 886 return encodelist(h) + "\n"
886 887
887 888 @wireprotocommand('hello')
888 889 def hello(repo, proto):
889 890 '''the hello command returns a set of lines describing various
890 891 interesting things about the server, in an RFC822-like format.
891 892 Currently the only one defined is "capabilities", which
892 893 consists of a line in the form:
893 894
894 895 capabilities: space separated list of tokens
895 896 '''
896 897 return "capabilities: %s\n" % (capabilities(repo, proto))
897 898
898 899 @wireprotocommand('listkeys', 'namespace')
899 900 def listkeys(repo, proto, namespace):
900 901 d = repo.listkeys(encoding.tolocal(namespace)).items()
901 902 return pushkeymod.encodekeys(d)
902 903
903 904 @wireprotocommand('lookup', 'key')
904 905 def lookup(repo, proto, key):
905 906 try:
906 907 k = encoding.tolocal(key)
907 908 c = repo[k]
908 909 r = c.hex()
909 910 success = 1
910 911 except Exception as inst:
911 912 r = str(inst)
912 913 success = 0
913 914 return "%d %s\n" % (success, r)
914 915
915 916 @wireprotocommand('known', 'nodes *')
916 917 def known(repo, proto, nodes, others):
917 918 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
918 919
919 920 @wireprotocommand('pushkey', 'namespace key old new')
920 921 def pushkey(repo, proto, namespace, key, old, new):
921 922 # compatibility with pre-1.8 clients which were accidentally
922 923 # sending raw binary nodes rather than utf-8-encoded hex
923 924 if len(new) == 20 and util.escapestr(new) != new:
924 925 # looks like it could be a binary node
925 926 try:
926 927 new.decode('utf-8')
927 928 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
928 929 except UnicodeDecodeError:
929 930 pass # binary, leave unmodified
930 931 else:
931 932 new = encoding.tolocal(new) # normal path
932 933
933 934 if util.safehasattr(proto, 'restore'):
934 935
935 936 proto.redirect()
936 937
937 938 try:
938 939 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
939 940 encoding.tolocal(old), new) or False
940 941 except error.Abort:
941 942 r = False
942 943
943 944 output = proto.restore()
944 945
945 946 return '%s\n%s' % (int(r), output)
946 947
947 948 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
948 949 encoding.tolocal(old), new)
949 950 return '%s\n' % int(r)
950 951
951 952 @wireprotocommand('stream_out')
952 953 def stream(repo, proto):
953 954 '''If the server supports streaming clone, it advertises the "stream"
954 955 capability with a value representing the version and flags of the repo
955 956 it is serving. Client checks to see if it understands the format.
956 957 '''
957 958 return streamres(streamclone.generatev1wireproto(repo))
958 959
959 960 @wireprotocommand('unbundle', 'heads')
960 961 def unbundle(repo, proto, heads):
961 962 their_heads = decodelist(heads)
962 963
963 964 try:
964 965 proto.redirect()
965 966
966 967 exchange.check_heads(repo, their_heads, 'preparing changes')
967 968
968 969 # write bundle data to temporary file because it can be big
969 970 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
970 971 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
971 972 r = 0
972 973 try:
973 974 proto.getfile(fp)
974 975 fp.seek(0)
975 976 gen = exchange.readbundle(repo.ui, fp, None)
976 977 if (isinstance(gen, changegroupmod.cg1unpacker)
977 978 and not bundle1allowed(repo, 'push')):
978 979 if proto.name == 'http':
979 980 # need to special case http because stderr do not get to
980 981 # the http client on failed push so we need to abuse some
981 982 # other error type to make sure the message get to the
982 983 # user.
983 984 return ooberror(bundle2required)
984 985 raise error.Abort(bundle2requiredmain,
985 986 hint=bundle2requiredhint)
986 987
987 988 r = exchange.unbundle(repo, gen, their_heads, 'serve',
988 989 proto._client())
989 990 if util.safehasattr(r, 'addpart'):
990 991 # The return looks streamable, we are in the bundle2 case and
991 992 # should return a stream.
992 993 return streamres(gen=r.getchunks())
993 994 return pushres(r)
994 995
995 996 finally:
996 997 fp.close()
997 998 os.unlink(tempname)
998 999
999 1000 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1000 1001 # handle non-bundle2 case first
1001 1002 if not getattr(exc, 'duringunbundle2', False):
1002 1003 try:
1003 1004 raise
1004 1005 except error.Abort:
1005 1006 # The old code we moved used util.stderr directly.
1006 1007 # We did not change it to minimise code change.
1007 1008 # This need to be moved to something proper.
1008 1009 # Feel free to do it.
1009 1010 util.stderr.write("abort: %s\n" % exc)
1010 1011 if exc.hint is not None:
1011 1012 util.stderr.write("(%s)\n" % exc.hint)
1012 1013 return pushres(0)
1013 1014 except error.PushRaced:
1014 1015 return pusherr(str(exc))
1015 1016
1016 1017 bundler = bundle2.bundle20(repo.ui)
1017 1018 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1018 1019 bundler.addpart(out)
1019 1020 try:
1020 1021 try:
1021 1022 raise
1022 1023 except error.PushkeyFailed as exc:
1023 1024 # check client caps
1024 1025 remotecaps = getattr(exc, '_replycaps', None)
1025 1026 if (remotecaps is not None
1026 1027 and 'pushkey' not in remotecaps.get('error', ())):
1027 1028 # no support remote side, fallback to Abort handler.
1028 1029 raise
1029 1030 part = bundler.newpart('error:pushkey')
1030 1031 part.addparam('in-reply-to', exc.partid)
1031 1032 if exc.namespace is not None:
1032 1033 part.addparam('namespace', exc.namespace, mandatory=False)
1033 1034 if exc.key is not None:
1034 1035 part.addparam('key', exc.key, mandatory=False)
1035 1036 if exc.new is not None:
1036 1037 part.addparam('new', exc.new, mandatory=False)
1037 1038 if exc.old is not None:
1038 1039 part.addparam('old', exc.old, mandatory=False)
1039 1040 if exc.ret is not None:
1040 1041 part.addparam('ret', exc.ret, mandatory=False)
1041 1042 except error.BundleValueError as exc:
1042 1043 errpart = bundler.newpart('error:unsupportedcontent')
1043 1044 if exc.parttype is not None:
1044 1045 errpart.addparam('parttype', exc.parttype)
1045 1046 if exc.params:
1046 1047 errpart.addparam('params', '\0'.join(exc.params))
1047 1048 except error.Abort as exc:
1048 1049 manargs = [('message', str(exc))]
1049 1050 advargs = []
1050 1051 if exc.hint is not None:
1051 1052 advargs.append(('hint', exc.hint))
1052 1053 bundler.addpart(bundle2.bundlepart('error:abort',
1053 1054 manargs, advargs))
1054 1055 except error.PushRaced as exc:
1055 1056 bundler.newpart('error:pushraced', [('message', str(exc))])
1056 1057 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now