##// END OF EJS Templates
wireproto: use maybecapturestdio() for push responses (API)...
Gregory Szorc -
r36084:caca3ac2 default
parent child Browse files
Show More
@@ -1,190 +1,189
1 1 # Copyright 2011 Fog Creek Software
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5 from __future__ import absolute_import
6 6
7 7 import os
8 8 import re
9 9
10 10 from mercurial.i18n import _
11 11
12 12 from mercurial import (
13 13 error,
14 14 httppeer,
15 15 util,
16 16 wireproto,
17 17 )
18 18
19 19 from . import (
20 20 lfutil,
21 21 )
22 22
23 23 urlerr = util.urlerr
24 24 urlreq = util.urlreq
25 25
26 26 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
27 27 '\n\nPlease enable it in your Mercurial config '
28 28 'file.\n')
29 29
30 30 # these will all be replaced by largefiles.uisetup
31 31 ssholdcallstream = None
32 32 httpoldcallstream = None
33 33
34 34 def putlfile(repo, proto, sha):
35 35 '''Server command for putting a largefile into a repository's local store
36 36 and into the user cache.'''
37 proto.redirect()
38
39 path = lfutil.storepath(repo, sha)
40 util.makedirs(os.path.dirname(path))
41 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
37 with proto.mayberedirectstdio() as output:
38 path = lfutil.storepath(repo, sha)
39 util.makedirs(os.path.dirname(path))
40 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
42 41
43 try:
44 proto.getfile(tmpfp)
45 tmpfp._fp.seek(0)
46 if sha != lfutil.hexsha1(tmpfp._fp):
47 raise IOError(0, _('largefile contents do not match hash'))
48 tmpfp.close()
49 lfutil.linktousercache(repo, sha)
50 except IOError as e:
51 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
52 (sha, e.strerror))
53 return wireproto.pushres(1)
54 finally:
55 tmpfp.discard()
42 try:
43 proto.getfile(tmpfp)
44 tmpfp._fp.seek(0)
45 if sha != lfutil.hexsha1(tmpfp._fp):
46 raise IOError(0, _('largefile contents do not match hash'))
47 tmpfp.close()
48 lfutil.linktousercache(repo, sha)
49 except IOError as e:
50 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
51 (sha, e.strerror))
52 return wireproto.pushres(1, output.getvalue() if output else '')
53 finally:
54 tmpfp.discard()
56 55
57 return wireproto.pushres(0)
56 return wireproto.pushres(0, output.getvalue() if output else '')
58 57
59 58 def getlfile(repo, proto, sha):
60 59 '''Server command for retrieving a largefile from the repository-local
61 60 cache or user cache.'''
62 61 filename = lfutil.findfile(repo, sha)
63 62 if not filename:
64 63 raise error.Abort(_('requested largefile %s not present in cache')
65 64 % sha)
66 65 f = open(filename, 'rb')
67 66 length = os.fstat(f.fileno())[6]
68 67
69 68 # Since we can't set an HTTP content-length header here, and
70 69 # Mercurial core provides no way to give the length of a streamres
71 70 # (and reading the entire file into RAM would be ill-advised), we
72 71 # just send the length on the first line of the response, like the
73 72 # ssh proto does for string responses.
74 73 def generator():
75 74 yield '%d\n' % length
76 75 for chunk in util.filechunkiter(f):
77 76 yield chunk
78 77 return wireproto.streamres_legacy(gen=generator())
79 78
80 79 def statlfile(repo, proto, sha):
81 80 '''Server command for checking if a largefile is present - returns '2\n' if
82 81 the largefile is missing, '0\n' if it seems to be in good condition.
83 82
84 83 The value 1 is reserved for mismatched checksum, but that is too expensive
85 84 to be verified on every stat and must be caught be running 'hg verify'
86 85 server side.'''
87 86 filename = lfutil.findfile(repo, sha)
88 87 if not filename:
89 88 return '2\n'
90 89 return '0\n'
91 90
92 91 def wirereposetup(ui, repo):
93 92 class lfileswirerepository(repo.__class__):
94 93 def putlfile(self, sha, fd):
95 94 # unfortunately, httprepository._callpush tries to convert its
96 95 # input file-like into a bundle before sending it, so we can't use
97 96 # it ...
98 97 if issubclass(self.__class__, httppeer.httppeer):
99 98 res = self._call('putlfile', data=fd, sha=sha,
100 99 headers={'content-type':'application/mercurial-0.1'})
101 100 try:
102 101 d, output = res.split('\n', 1)
103 102 for l in output.splitlines(True):
104 103 self.ui.warn(_('remote: '), l) # assume l ends with \n
105 104 return int(d)
106 105 except ValueError:
107 106 self.ui.warn(_('unexpected putlfile response: %r\n') % res)
108 107 return 1
109 108 # ... but we can't use sshrepository._call because the data=
110 109 # argument won't get sent, and _callpush does exactly what we want
111 110 # in this case: send the data straight through
112 111 else:
113 112 try:
114 113 ret, output = self._callpush("putlfile", fd, sha=sha)
115 114 if ret == "":
116 115 raise error.ResponseError(_('putlfile failed:'),
117 116 output)
118 117 return int(ret)
119 118 except IOError:
120 119 return 1
121 120 except ValueError:
122 121 raise error.ResponseError(
123 122 _('putlfile failed (unexpected response):'), ret)
124 123
125 124 def getlfile(self, sha):
126 125 """returns an iterable with the chunks of the file with sha sha"""
127 126 stream = self._callstream("getlfile", sha=sha)
128 127 length = stream.readline()
129 128 try:
130 129 length = int(length)
131 130 except ValueError:
132 131 self._abort(error.ResponseError(_("unexpected response:"),
133 132 length))
134 133
135 134 # SSH streams will block if reading more than length
136 135 for chunk in util.filechunkiter(stream, limit=length):
137 136 yield chunk
138 137 # HTTP streams must hit the end to process the last empty
139 138 # chunk of Chunked-Encoding so the connection can be reused.
140 139 if issubclass(self.__class__, httppeer.httppeer):
141 140 chunk = stream.read(1)
142 141 if chunk:
143 142 self._abort(error.ResponseError(_("unexpected response:"),
144 143 chunk))
145 144
146 145 @wireproto.batchable
147 146 def statlfile(self, sha):
148 147 f = wireproto.future()
149 148 result = {'sha': sha}
150 149 yield result, f
151 150 try:
152 151 yield int(f.value)
153 152 except (ValueError, urlerr.httperror):
154 153 # If the server returns anything but an integer followed by a
155 154 # newline, newline, it's not speaking our language; if we get
156 155 # an HTTP error, we can't be sure the largefile is present;
157 156 # either way, consider it missing.
158 157 yield 2
159 158
160 159 repo.__class__ = lfileswirerepository
161 160
162 161 # advertise the largefiles=serve capability
163 162 def _capabilities(orig, repo, proto):
164 163 '''announce largefile server capability'''
165 164 caps = orig(repo, proto)
166 165 caps.append('largefiles=serve')
167 166 return caps
168 167
169 168 def heads(repo, proto):
170 169 '''Wrap server command - largefile capable clients will know to call
171 170 lheads instead'''
172 171 if lfutil.islfilesrepo(repo):
173 172 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
174 173 return wireproto.heads(repo, proto)
175 174
176 175 def sshrepocallstream(self, cmd, **args):
177 176 if cmd == 'heads' and self.capable('largefiles'):
178 177 cmd = 'lheads'
179 178 if cmd == 'batch' and self.capable('largefiles'):
180 179 args[r'cmds'] = args[r'cmds'].replace('heads ', 'lheads ')
181 180 return ssholdcallstream(self, cmd, **args)
182 181
183 182 headsre = re.compile(r'(^|;)heads\b')
184 183
185 184 def httprepocallstream(self, cmd, **args):
186 185 if cmd == 'heads' and self.capable('largefiles'):
187 186 cmd = 'lheads'
188 187 if cmd == 'batch' and self.capable('largefiles'):
189 188 args[r'cmds'] = headsre.sub('lheads', args[r'cmds'])
190 189 return httpoldcallstream(self, cmd, **args)
@@ -1,1093 +1,1096
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class remoteiterbatcher(peer.iterbatcher):
45 45 def __init__(self, remote):
46 46 super(remoteiterbatcher, self).__init__()
47 47 self._remote = remote
48 48
49 49 def __getattr__(self, name):
50 50 # Validate this method is batchable, since submit() only supports
51 51 # batchable methods.
52 52 fn = getattr(self._remote, name)
53 53 if not getattr(fn, 'batchable', None):
54 54 raise error.ProgrammingError('Attempted to batch a non-batchable '
55 55 'call to %r' % name)
56 56
57 57 return super(remoteiterbatcher, self).__getattr__(name)
58 58
59 59 def submit(self):
60 60 """Break the batch request into many patch calls and pipeline them.
61 61
62 62 This is mostly valuable over http where request sizes can be
63 63 limited, but can be used in other places as well.
64 64 """
65 65 # 2-tuple of (command, arguments) that represents what will be
66 66 # sent over the wire.
67 67 requests = []
68 68
69 69 # 4-tuple of (command, final future, @batchable generator, remote
70 70 # future).
71 71 results = []
72 72
73 73 for command, args, opts, finalfuture in self.calls:
74 74 mtd = getattr(self._remote, command)
75 75 batchable = mtd.batchable(mtd.__self__, *args, **opts)
76 76
77 77 commandargs, fremote = next(batchable)
78 78 assert fremote
79 79 requests.append((command, commandargs))
80 80 results.append((command, finalfuture, batchable, fremote))
81 81
82 82 if requests:
83 83 self._resultiter = self._remote._submitbatch(requests)
84 84
85 85 self._results = results
86 86
87 87 def results(self):
88 88 for command, finalfuture, batchable, remotefuture in self._results:
89 89 # Get the raw result, set it in the remote future, feed it
90 90 # back into the @batchable generator so it can be decoded, and
91 91 # set the result on the final future to this value.
92 92 remoteresult = next(self._resultiter)
93 93 remotefuture.set(remoteresult)
94 94 finalfuture.set(next(batchable))
95 95
96 96 # Verify our @batchable generators only emit 2 values.
97 97 try:
98 98 next(batchable)
99 99 except StopIteration:
100 100 pass
101 101 else:
102 102 raise error.ProgrammingError('%s @batchable generator emitted '
103 103 'unexpected value count' % command)
104 104
105 105 yield finalfuture.value
106 106
107 107 # Forward a couple of names from peer to make wireproto interactions
108 108 # slightly more sensible.
109 109 batchable = peer.batchable
110 110 future = peer.future
111 111
112 112 # list of nodes encoding / decoding
113 113
114 114 def decodelist(l, sep=' '):
115 115 if l:
116 116 return [bin(v) for v in l.split(sep)]
117 117 return []
118 118
119 119 def encodelist(l, sep=' '):
120 120 try:
121 121 return sep.join(map(hex, l))
122 122 except TypeError:
123 123 raise
124 124
125 125 # batched call argument encoding
126 126
127 127 def escapearg(plain):
128 128 return (plain
129 129 .replace(':', ':c')
130 130 .replace(',', ':o')
131 131 .replace(';', ':s')
132 132 .replace('=', ':e'))
133 133
134 134 def unescapearg(escaped):
135 135 return (escaped
136 136 .replace(':e', '=')
137 137 .replace(':s', ';')
138 138 .replace(':o', ',')
139 139 .replace(':c', ':'))
140 140
141 141 def encodebatchcmds(req):
142 142 """Return a ``cmds`` argument value for the ``batch`` command."""
143 143 cmds = []
144 144 for op, argsdict in req:
145 145 # Old servers didn't properly unescape argument names. So prevent
146 146 # the sending of argument names that may not be decoded properly by
147 147 # servers.
148 148 assert all(escapearg(k) == k for k in argsdict)
149 149
150 150 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
151 151 for k, v in argsdict.iteritems())
152 152 cmds.append('%s %s' % (op, args))
153 153
154 154 return ';'.join(cmds)
155 155
156 156 # mapping of options accepted by getbundle and their types
157 157 #
158 158 # Meant to be extended by extensions. It is extensions responsibility to ensure
159 159 # such options are properly processed in exchange.getbundle.
160 160 #
161 161 # supported types are:
162 162 #
163 163 # :nodes: list of binary nodes
164 164 # :csv: list of comma-separated values
165 165 # :scsv: list of comma-separated values return as set
166 166 # :plain: string with no transformation needed.
167 167 gboptsmap = {'heads': 'nodes',
168 168 'bookmarks': 'boolean',
169 169 'common': 'nodes',
170 170 'obsmarkers': 'boolean',
171 171 'phases': 'boolean',
172 172 'bundlecaps': 'scsv',
173 173 'listkeys': 'csv',
174 174 'cg': 'boolean',
175 175 'cbattempted': 'boolean',
176 176 'stream': 'boolean',
177 177 }
178 178
179 179 # client side
180 180
181 181 class wirepeer(repository.legacypeer):
182 182 """Client-side interface for communicating with a peer repository.
183 183
184 184 Methods commonly call wire protocol commands of the same name.
185 185
186 186 See also httppeer.py and sshpeer.py for protocol-specific
187 187 implementations of this interface.
188 188 """
189 189 # Begin of basewirepeer interface.
190 190
191 191 def iterbatch(self):
192 192 return remoteiterbatcher(self)
193 193
194 194 @batchable
195 195 def lookup(self, key):
196 196 self.requirecap('lookup', _('look up remote revision'))
197 197 f = future()
198 198 yield {'key': encoding.fromlocal(key)}, f
199 199 d = f.value
200 200 success, data = d[:-1].split(" ", 1)
201 201 if int(success):
202 202 yield bin(data)
203 203 else:
204 204 self._abort(error.RepoError(data))
205 205
206 206 @batchable
207 207 def heads(self):
208 208 f = future()
209 209 yield {}, f
210 210 d = f.value
211 211 try:
212 212 yield decodelist(d[:-1])
213 213 except ValueError:
214 214 self._abort(error.ResponseError(_("unexpected response:"), d))
215 215
216 216 @batchable
217 217 def known(self, nodes):
218 218 f = future()
219 219 yield {'nodes': encodelist(nodes)}, f
220 220 d = f.value
221 221 try:
222 222 yield [bool(int(b)) for b in d]
223 223 except ValueError:
224 224 self._abort(error.ResponseError(_("unexpected response:"), d))
225 225
226 226 @batchable
227 227 def branchmap(self):
228 228 f = future()
229 229 yield {}, f
230 230 d = f.value
231 231 try:
232 232 branchmap = {}
233 233 for branchpart in d.splitlines():
234 234 branchname, branchheads = branchpart.split(' ', 1)
235 235 branchname = encoding.tolocal(urlreq.unquote(branchname))
236 236 branchheads = decodelist(branchheads)
237 237 branchmap[branchname] = branchheads
238 238 yield branchmap
239 239 except TypeError:
240 240 self._abort(error.ResponseError(_("unexpected response:"), d))
241 241
242 242 @batchable
243 243 def listkeys(self, namespace):
244 244 if not self.capable('pushkey'):
245 245 yield {}, None
246 246 f = future()
247 247 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
248 248 yield {'namespace': encoding.fromlocal(namespace)}, f
249 249 d = f.value
250 250 self.ui.debug('received listkey for "%s": %i bytes\n'
251 251 % (namespace, len(d)))
252 252 yield pushkeymod.decodekeys(d)
253 253
254 254 @batchable
255 255 def pushkey(self, namespace, key, old, new):
256 256 if not self.capable('pushkey'):
257 257 yield False, None
258 258 f = future()
259 259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
260 260 yield {'namespace': encoding.fromlocal(namespace),
261 261 'key': encoding.fromlocal(key),
262 262 'old': encoding.fromlocal(old),
263 263 'new': encoding.fromlocal(new)}, f
264 264 d = f.value
265 265 d, output = d.split('\n', 1)
266 266 try:
267 267 d = bool(int(d))
268 268 except ValueError:
269 269 raise error.ResponseError(
270 270 _('push failed (unexpected response):'), d)
271 271 for l in output.splitlines(True):
272 272 self.ui.status(_('remote: '), l)
273 273 yield d
274 274
275 275 def stream_out(self):
276 276 return self._callstream('stream_out')
277 277
278 278 def getbundle(self, source, **kwargs):
279 279 kwargs = pycompat.byteskwargs(kwargs)
280 280 self.requirecap('getbundle', _('look up remote changes'))
281 281 opts = {}
282 282 bundlecaps = kwargs.get('bundlecaps')
283 283 if bundlecaps is not None:
284 284 kwargs['bundlecaps'] = sorted(bundlecaps)
285 285 else:
286 286 bundlecaps = () # kwargs could have it to None
287 287 for key, value in kwargs.iteritems():
288 288 if value is None:
289 289 continue
290 290 keytype = gboptsmap.get(key)
291 291 if keytype is None:
292 292 raise error.ProgrammingError(
293 293 'Unexpectedly None keytype for key %s' % key)
294 294 elif keytype == 'nodes':
295 295 value = encodelist(value)
296 296 elif keytype in ('csv', 'scsv'):
297 297 value = ','.join(value)
298 298 elif keytype == 'boolean':
299 299 value = '%i' % bool(value)
300 300 elif keytype != 'plain':
301 301 raise KeyError('unknown getbundle option type %s'
302 302 % keytype)
303 303 opts[key] = value
304 304 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
305 305 if any((cap.startswith('HG2') for cap in bundlecaps)):
306 306 return bundle2.getunbundler(self.ui, f)
307 307 else:
308 308 return changegroupmod.cg1unpacker(f, 'UN')
309 309
310 310 def unbundle(self, cg, heads, url):
311 311 '''Send cg (a readable file-like object representing the
312 312 changegroup to push, typically a chunkbuffer object) to the
313 313 remote server as a bundle.
314 314
315 315 When pushing a bundle10 stream, return an integer indicating the
316 316 result of the push (see changegroup.apply()).
317 317
318 318 When pushing a bundle20 stream, return a bundle20 stream.
319 319
320 320 `url` is the url the client thinks it's pushing to, which is
321 321 visible to hooks.
322 322 '''
323 323
324 324 if heads != ['force'] and self.capable('unbundlehash'):
325 325 heads = encodelist(['hashed',
326 326 hashlib.sha1(''.join(sorted(heads))).digest()])
327 327 else:
328 328 heads = encodelist(heads)
329 329
330 330 if util.safehasattr(cg, 'deltaheader'):
331 331 # this a bundle10, do the old style call sequence
332 332 ret, output = self._callpush("unbundle", cg, heads=heads)
333 333 if ret == "":
334 334 raise error.ResponseError(
335 335 _('push failed:'), output)
336 336 try:
337 337 ret = int(ret)
338 338 except ValueError:
339 339 raise error.ResponseError(
340 340 _('push failed (unexpected response):'), ret)
341 341
342 342 for l in output.splitlines(True):
343 343 self.ui.status(_('remote: '), l)
344 344 else:
345 345 # bundle2 push. Send a stream, fetch a stream.
346 346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
347 347 ret = bundle2.getunbundler(self.ui, stream)
348 348 return ret
349 349
350 350 # End of basewirepeer interface.
351 351
352 352 # Begin of baselegacywirepeer interface.
353 353
354 354 def branches(self, nodes):
355 355 n = encodelist(nodes)
356 356 d = self._call("branches", nodes=n)
357 357 try:
358 358 br = [tuple(decodelist(b)) for b in d.splitlines()]
359 359 return br
360 360 except ValueError:
361 361 self._abort(error.ResponseError(_("unexpected response:"), d))
362 362
363 363 def between(self, pairs):
364 364 batch = 8 # avoid giant requests
365 365 r = []
366 366 for i in xrange(0, len(pairs), batch):
367 367 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
368 368 d = self._call("between", pairs=n)
369 369 try:
370 370 r.extend(l and decodelist(l) or [] for l in d.splitlines())
371 371 except ValueError:
372 372 self._abort(error.ResponseError(_("unexpected response:"), d))
373 373 return r
374 374
375 375 def changegroup(self, nodes, kind):
376 376 n = encodelist(nodes)
377 377 f = self._callcompressable("changegroup", roots=n)
378 378 return changegroupmod.cg1unpacker(f, 'UN')
379 379
380 380 def changegroupsubset(self, bases, heads, kind):
381 381 self.requirecap('changegroupsubset', _('look up remote changes'))
382 382 bases = encodelist(bases)
383 383 heads = encodelist(heads)
384 384 f = self._callcompressable("changegroupsubset",
385 385 bases=bases, heads=heads)
386 386 return changegroupmod.cg1unpacker(f, 'UN')
387 387
388 388 # End of baselegacywirepeer interface.
389 389
390 390 def _submitbatch(self, req):
391 391 """run batch request <req> on the server
392 392
393 393 Returns an iterator of the raw responses from the server.
394 394 """
395 395 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
396 396 chunk = rsp.read(1024)
397 397 work = [chunk]
398 398 while chunk:
399 399 while ';' not in chunk and chunk:
400 400 chunk = rsp.read(1024)
401 401 work.append(chunk)
402 402 merged = ''.join(work)
403 403 while ';' in merged:
404 404 one, merged = merged.split(';', 1)
405 405 yield unescapearg(one)
406 406 chunk = rsp.read(1024)
407 407 work = [merged, chunk]
408 408 yield unescapearg(''.join(work))
409 409
410 410 def _submitone(self, op, args):
411 411 return self._call(op, **pycompat.strkwargs(args))
412 412
413 413 def debugwireargs(self, one, two, three=None, four=None, five=None):
414 414 # don't pass optional arguments left at their default value
415 415 opts = {}
416 416 if three is not None:
417 417 opts[r'three'] = three
418 418 if four is not None:
419 419 opts[r'four'] = four
420 420 return self._call('debugwireargs', one=one, two=two, **opts)
421 421
422 422 def _call(self, cmd, **args):
423 423 """execute <cmd> on the server
424 424
425 425 The command is expected to return a simple string.
426 426
427 427 returns the server reply as a string."""
428 428 raise NotImplementedError()
429 429
430 430 def _callstream(self, cmd, **args):
431 431 """execute <cmd> on the server
432 432
433 433 The command is expected to return a stream. Note that if the
434 434 command doesn't return a stream, _callstream behaves
435 435 differently for ssh and http peers.
436 436
437 437 returns the server reply as a file like object.
438 438 """
439 439 raise NotImplementedError()
440 440
441 441 def _callcompressable(self, cmd, **args):
442 442 """execute <cmd> on the server
443 443
444 444 The command is expected to return a stream.
445 445
446 446 The stream may have been compressed in some implementations. This
447 447 function takes care of the decompression. This is the only difference
448 448 with _callstream.
449 449
450 450 returns the server reply as a file like object.
451 451 """
452 452 raise NotImplementedError()
453 453
454 454 def _callpush(self, cmd, fp, **args):
455 455 """execute a <cmd> on server
456 456
457 457 The command is expected to be related to a push. Push has a special
458 458 return method.
459 459
460 460 returns the server reply as a (ret, output) tuple. ret is either
461 461 empty (error) or a stringified int.
462 462 """
463 463 raise NotImplementedError()
464 464
465 465 def _calltwowaystream(self, cmd, fp, **args):
466 466 """execute <cmd> on server
467 467
468 468 The command will send a stream to the server and get a stream in reply.
469 469 """
470 470 raise NotImplementedError()
471 471
472 472 def _abort(self, exception):
473 473 """clearly abort the wire protocol connection and raise the exception
474 474 """
475 475 raise NotImplementedError()
476 476
477 477 # server side
478 478
479 479 # wire protocol command can either return a string or one of these classes.
480 480 class streamres(object):
481 481 """wireproto reply: binary stream
482 482
483 483 The call was successful and the result is a stream.
484 484
485 485 Accepts a generator containing chunks of data to be sent to the client.
486 486
487 487 ``prefer_uncompressed`` indicates that the data is expected to be
488 488 uncompressable and that the stream should therefore use the ``none``
489 489 engine.
490 490 """
491 491 def __init__(self, gen=None, prefer_uncompressed=False):
492 492 self.gen = gen
493 493 self.prefer_uncompressed = prefer_uncompressed
494 494
495 495 class streamres_legacy(object):
496 496 """wireproto reply: uncompressed binary stream
497 497
498 498 The call was successful and the result is a stream.
499 499
500 500 Accepts a generator containing chunks of data to be sent to the client.
501 501
502 502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
503 503 using the application/mercurial-0.1 media type.
504 504 """
505 505 def __init__(self, gen=None):
506 506 self.gen = gen
507 507
508 508 class pushres(object):
509 509 """wireproto reply: success with simple integer return
510 510
511 511 The call was successful and returned an integer contained in `self.res`.
512 512 """
513 def __init__(self, res):
513 def __init__(self, res, output):
514 514 self.res = res
515 self.output = output
515 516
516 517 class pusherr(object):
517 518 """wireproto reply: failure
518 519
519 520 The call failed. The `self.res` attribute contains the error message.
520 521 """
521 def __init__(self, res):
522 def __init__(self, res, output):
522 523 self.res = res
524 self.output = output
523 525
524 526 class ooberror(object):
525 527 """wireproto reply: failure of a batch of operation
526 528
527 529 Something failed during a batch call. The error message is stored in
528 530 `self.message`.
529 531 """
530 532 def __init__(self, message):
531 533 self.message = message
532 534
533 535 def getdispatchrepo(repo, proto, command):
534 536 """Obtain the repo used for processing wire protocol commands.
535 537
536 538 The intent of this function is to serve as a monkeypatch point for
537 539 extensions that need commands to operate on different repo views under
538 540 specialized circumstances.
539 541 """
540 542 return repo.filtered('served')
541 543
542 544 def dispatch(repo, proto, command):
543 545 repo = getdispatchrepo(repo, proto, command)
544 546 func, spec = commands[command]
545 547 args = proto.getargs(spec)
546 548 return func(repo, proto, *args)
547 549
548 550 def options(cmd, keys, others):
549 551 opts = {}
550 552 for k in keys:
551 553 if k in others:
552 554 opts[k] = others[k]
553 555 del others[k]
554 556 if others:
555 557 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
556 558 % (cmd, ",".join(others)))
557 559 return opts
558 560
559 561 def bundle1allowed(repo, action):
560 562 """Whether a bundle1 operation is allowed from the server.
561 563
562 564 Priority is:
563 565
564 566 1. server.bundle1gd.<action> (if generaldelta active)
565 567 2. server.bundle1.<action>
566 568 3. server.bundle1gd (if generaldelta active)
567 569 4. server.bundle1
568 570 """
569 571 ui = repo.ui
570 572 gd = 'generaldelta' in repo.requirements
571 573
572 574 if gd:
573 575 v = ui.configbool('server', 'bundle1gd.%s' % action)
574 576 if v is not None:
575 577 return v
576 578
577 579 v = ui.configbool('server', 'bundle1.%s' % action)
578 580 if v is not None:
579 581 return v
580 582
581 583 if gd:
582 584 v = ui.configbool('server', 'bundle1gd')
583 585 if v is not None:
584 586 return v
585 587
586 588 return ui.configbool('server', 'bundle1')
587 589
588 590 def supportedcompengines(ui, proto, role):
589 591 """Obtain the list of supported compression engines for a request."""
590 592 assert role in (util.CLIENTROLE, util.SERVERROLE)
591 593
592 594 compengines = util.compengines.supportedwireengines(role)
593 595
594 596 # Allow config to override default list and ordering.
595 597 if role == util.SERVERROLE:
596 598 configengines = ui.configlist('server', 'compressionengines')
597 599 config = 'server.compressionengines'
598 600 else:
599 601 # This is currently implemented mainly to facilitate testing. In most
600 602 # cases, the server should be in charge of choosing a compression engine
601 603 # because a server has the most to lose from a sub-optimal choice. (e.g.
602 604 # CPU DoS due to an expensive engine or a network DoS due to poor
603 605 # compression ratio).
604 606 configengines = ui.configlist('experimental',
605 607 'clientcompressionengines')
606 608 config = 'experimental.clientcompressionengines'
607 609
608 610 # No explicit config. Filter out the ones that aren't supposed to be
609 611 # advertised and return default ordering.
610 612 if not configengines:
611 613 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
612 614 return [e for e in compengines
613 615 if getattr(e.wireprotosupport(), attr) > 0]
614 616
615 617 # If compression engines are listed in the config, assume there is a good
616 618 # reason for it (like server operators wanting to achieve specific
617 619 # performance characteristics). So fail fast if the config references
618 620 # unusable compression engines.
619 621 validnames = set(e.name() for e in compengines)
620 622 invalidnames = set(e for e in configengines if e not in validnames)
621 623 if invalidnames:
622 624 raise error.Abort(_('invalid compression engine defined in %s: %s') %
623 625 (config, ', '.join(sorted(invalidnames))))
624 626
625 627 compengines = [e for e in compengines if e.name() in configengines]
626 628 compengines = sorted(compengines,
627 629 key=lambda e: configengines.index(e.name()))
628 630
629 631 if not compengines:
630 632 raise error.Abort(_('%s config option does not specify any known '
631 633 'compression engines') % config,
632 634 hint=_('usable compression engines: %s') %
633 635 ', '.sorted(validnames))
634 636
635 637 return compengines
636 638
637 639 class commandentry(object):
638 640 """Represents a declared wire protocol command."""
639 641 def __init__(self, func, args=''):
640 642 self.func = func
641 643 self.args = args
642 644
643 645 def _merge(self, func, args):
644 646 """Merge this instance with an incoming 2-tuple.
645 647
646 648 This is called when a caller using the old 2-tuple API attempts
647 649 to replace an instance. The incoming values are merged with
648 650 data not captured by the 2-tuple and a new instance containing
649 651 the union of the two objects is returned.
650 652 """
651 653 return commandentry(func, args)
652 654
653 655 # Old code treats instances as 2-tuples. So expose that interface.
654 656 def __iter__(self):
655 657 yield self.func
656 658 yield self.args
657 659
658 660 def __getitem__(self, i):
659 661 if i == 0:
660 662 return self.func
661 663 elif i == 1:
662 664 return self.args
663 665 else:
664 666 raise IndexError('can only access elements 0 and 1')
665 667
666 668 class commanddict(dict):
667 669 """Container for registered wire protocol commands.
668 670
669 671 It behaves like a dict. But __setitem__ is overwritten to allow silent
670 672 coercion of values from 2-tuples for API compatibility.
671 673 """
672 674 def __setitem__(self, k, v):
673 675 if isinstance(v, commandentry):
674 676 pass
675 677 # Cast 2-tuples to commandentry instances.
676 678 elif isinstance(v, tuple):
677 679 if len(v) != 2:
678 680 raise ValueError('command tuples must have exactly 2 elements')
679 681
680 682 # It is common for extensions to wrap wire protocol commands via
681 683 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
682 684 # doing this aren't aware of the new API that uses objects to store
683 685 # command entries, we automatically merge old state with new.
684 686 if k in self:
685 687 v = self[k]._merge(v[0], v[1])
686 688 else:
687 689 v = commandentry(v[0], v[1])
688 690 else:
689 691 raise ValueError('command entries must be commandentry instances '
690 692 'or 2-tuples')
691 693
692 694 return super(commanddict, self).__setitem__(k, v)
693 695
694 696 def commandavailable(self, command, proto):
695 697 """Determine if a command is available for the requested protocol."""
696 698 # For now, commands are available for all protocols. So do a simple
697 699 # membership test.
698 700 return command in self
699 701
700 702 commands = commanddict()
701 703
702 704 def wireprotocommand(name, args=''):
703 705 """Decorator to declare a wire protocol command.
704 706
705 707 ``name`` is the name of the wire protocol command being provided.
706 708
707 709 ``args`` is a space-delimited list of named arguments that the command
708 710 accepts. ``*`` is a special value that says to accept all arguments.
709 711 """
710 712 def register(func):
711 713 commands[name] = commandentry(func, args)
712 714 return func
713 715 return register
714 716
715 717 @wireprotocommand('batch', 'cmds *')
716 718 def batch(repo, proto, cmds, others):
717 719 repo = repo.filtered("served")
718 720 res = []
719 721 for pair in cmds.split(';'):
720 722 op, args = pair.split(' ', 1)
721 723 vals = {}
722 724 for a in args.split(','):
723 725 if a:
724 726 n, v = a.split('=')
725 727 vals[unescapearg(n)] = unescapearg(v)
726 728 func, spec = commands[op]
727 729 if spec:
728 730 keys = spec.split()
729 731 data = {}
730 732 for k in keys:
731 733 if k == '*':
732 734 star = {}
733 735 for key in vals.keys():
734 736 if key not in keys:
735 737 star[key] = vals[key]
736 738 data['*'] = star
737 739 else:
738 740 data[k] = vals[k]
739 741 result = func(repo, proto, *[data[k] for k in keys])
740 742 else:
741 743 result = func(repo, proto)
742 744 if isinstance(result, ooberror):
743 745 return result
744 746 res.append(escapearg(result))
745 747 return ';'.join(res)
746 748
747 749 @wireprotocommand('between', 'pairs')
748 750 def between(repo, proto, pairs):
749 751 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
750 752 r = []
751 753 for b in repo.between(pairs):
752 754 r.append(encodelist(b) + "\n")
753 755 return "".join(r)
754 756
755 757 @wireprotocommand('branchmap')
756 758 def branchmap(repo, proto):
757 759 branchmap = repo.branchmap()
758 760 heads = []
759 761 for branch, nodes in branchmap.iteritems():
760 762 branchname = urlreq.quote(encoding.fromlocal(branch))
761 763 branchnodes = encodelist(nodes)
762 764 heads.append('%s %s' % (branchname, branchnodes))
763 765 return '\n'.join(heads)
764 766
765 767 @wireprotocommand('branches', 'nodes')
766 768 def branches(repo, proto, nodes):
767 769 nodes = decodelist(nodes)
768 770 r = []
769 771 for b in repo.branches(nodes):
770 772 r.append(encodelist(b) + "\n")
771 773 return "".join(r)
772 774
773 775 @wireprotocommand('clonebundles', '')
774 776 def clonebundles(repo, proto):
775 777 """Server command for returning info for available bundles to seed clones.
776 778
777 779 Clients will parse this response and determine what bundle to fetch.
778 780
779 781 Extensions may wrap this command to filter or dynamically emit data
780 782 depending on the request. e.g. you could advertise URLs for the closest
781 783 data center given the client's IP address.
782 784 """
783 785 return repo.vfs.tryread('clonebundles.manifest')
784 786
785 787 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
786 788 'known', 'getbundle', 'unbundlehash', 'batch']
787 789
788 790 def _capabilities(repo, proto):
789 791 """return a list of capabilities for a repo
790 792
791 793 This function exists to allow extensions to easily wrap capabilities
792 794 computation
793 795
794 796 - returns a lists: easy to alter
795 797 - change done here will be propagated to both `capabilities` and `hello`
796 798 command without any other action needed.
797 799 """
798 800 # copy to prevent modification of the global list
799 801 caps = list(wireprotocaps)
800 802 if streamclone.allowservergeneration(repo):
801 803 if repo.ui.configbool('server', 'preferuncompressed'):
802 804 caps.append('stream-preferred')
803 805 requiredformats = repo.requirements & repo.supportedformats
804 806 # if our local revlogs are just revlogv1, add 'stream' cap
805 807 if not requiredformats - {'revlogv1'}:
806 808 caps.append('stream')
807 809 # otherwise, add 'streamreqs' detailing our local revlog format
808 810 else:
809 811 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
810 812 if repo.ui.configbool('experimental', 'bundle2-advertise'):
811 813 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
812 814 caps.append('bundle2=' + urlreq.quote(capsblob))
813 815 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
814 816
815 817 if proto.name == 'http':
816 818 caps.append('httpheader=%d' %
817 819 repo.ui.configint('server', 'maxhttpheaderlen'))
818 820 if repo.ui.configbool('experimental', 'httppostargs'):
819 821 caps.append('httppostargs')
820 822
821 823 # FUTURE advertise 0.2rx once support is implemented
822 824 # FUTURE advertise minrx and mintx after consulting config option
823 825 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
824 826
825 827 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
826 828 if compengines:
827 829 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
828 830 for e in compengines)
829 831 caps.append('compression=%s' % comptypes)
830 832
831 833 return caps
832 834
833 835 # If you are writing an extension and consider wrapping this function. Wrap
834 836 # `_capabilities` instead.
835 837 @wireprotocommand('capabilities')
836 838 def capabilities(repo, proto):
837 839 return ' '.join(_capabilities(repo, proto))
838 840
839 841 @wireprotocommand('changegroup', 'roots')
840 842 def changegroup(repo, proto, roots):
841 843 nodes = decodelist(roots)
842 844 outgoing = discovery.outgoing(repo, missingroots=nodes,
843 845 missingheads=repo.heads())
844 846 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
845 847 gen = iter(lambda: cg.read(32768), '')
846 848 return streamres(gen=gen)
847 849
848 850 @wireprotocommand('changegroupsubset', 'bases heads')
849 851 def changegroupsubset(repo, proto, bases, heads):
850 852 bases = decodelist(bases)
851 853 heads = decodelist(heads)
852 854 outgoing = discovery.outgoing(repo, missingroots=bases,
853 855 missingheads=heads)
854 856 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
855 857 gen = iter(lambda: cg.read(32768), '')
856 858 return streamres(gen=gen)
857 859
858 860 @wireprotocommand('debugwireargs', 'one two *')
859 861 def debugwireargs(repo, proto, one, two, others):
860 862 # only accept optional args from the known set
861 863 opts = options('debugwireargs', ['three', 'four'], others)
862 864 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
863 865
864 866 @wireprotocommand('getbundle', '*')
865 867 def getbundle(repo, proto, others):
866 868 opts = options('getbundle', gboptsmap.keys(), others)
867 869 for k, v in opts.iteritems():
868 870 keytype = gboptsmap[k]
869 871 if keytype == 'nodes':
870 872 opts[k] = decodelist(v)
871 873 elif keytype == 'csv':
872 874 opts[k] = list(v.split(','))
873 875 elif keytype == 'scsv':
874 876 opts[k] = set(v.split(','))
875 877 elif keytype == 'boolean':
876 878 # Client should serialize False as '0', which is a non-empty string
877 879 # so it evaluates as a True bool.
878 880 if v == '0':
879 881 opts[k] = False
880 882 else:
881 883 opts[k] = bool(v)
882 884 elif keytype != 'plain':
883 885 raise KeyError('unknown getbundle option type %s'
884 886 % keytype)
885 887
886 888 if not bundle1allowed(repo, 'pull'):
887 889 if not exchange.bundle2requested(opts.get('bundlecaps')):
888 890 if proto.name == 'http':
889 891 return ooberror(bundle2required)
890 892 raise error.Abort(bundle2requiredmain,
891 893 hint=bundle2requiredhint)
892 894
893 895 prefercompressed = True
894 896
895 897 try:
896 898 if repo.ui.configbool('server', 'disablefullbundle'):
897 899 # Check to see if this is a full clone.
898 900 clheads = set(repo.changelog.heads())
899 901 changegroup = opts.get('cg', True)
900 902 heads = set(opts.get('heads', set()))
901 903 common = set(opts.get('common', set()))
902 904 common.discard(nullid)
903 905 if changegroup and not common and clheads == heads:
904 906 raise error.Abort(
905 907 _('server has pull-based clones disabled'),
906 908 hint=_('remove --pull if specified or upgrade Mercurial'))
907 909
908 910 info, chunks = exchange.getbundlechunks(repo, 'serve',
909 911 **pycompat.strkwargs(opts))
910 912 prefercompressed = info.get('prefercompressed', True)
911 913 except error.Abort as exc:
912 914 # cleanly forward Abort error to the client
913 915 if not exchange.bundle2requested(opts.get('bundlecaps')):
914 916 if proto.name == 'http':
915 917 return ooberror(str(exc) + '\n')
916 918 raise # cannot do better for bundle1 + ssh
917 919 # bundle2 request expect a bundle2 reply
918 920 bundler = bundle2.bundle20(repo.ui)
919 921 manargs = [('message', str(exc))]
920 922 advargs = []
921 923 if exc.hint is not None:
922 924 advargs.append(('hint', exc.hint))
923 925 bundler.addpart(bundle2.bundlepart('error:abort',
924 926 manargs, advargs))
925 927 chunks = bundler.getchunks()
926 928 prefercompressed = False
927 929
928 930 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
929 931
930 932 @wireprotocommand('heads')
931 933 def heads(repo, proto):
932 934 h = repo.heads()
933 935 return encodelist(h) + "\n"
934 936
935 937 @wireprotocommand('hello')
936 938 def hello(repo, proto):
937 939 '''the hello command returns a set of lines describing various
938 940 interesting things about the server, in an RFC822-like format.
939 941 Currently the only one defined is "capabilities", which
940 942 consists of a line in the form:
941 943
942 944 capabilities: space separated list of tokens
943 945 '''
944 946 return "capabilities: %s\n" % (capabilities(repo, proto))
945 947
946 948 @wireprotocommand('listkeys', 'namespace')
947 949 def listkeys(repo, proto, namespace):
948 950 d = repo.listkeys(encoding.tolocal(namespace)).items()
949 951 return pushkeymod.encodekeys(d)
950 952
951 953 @wireprotocommand('lookup', 'key')
952 954 def lookup(repo, proto, key):
953 955 try:
954 956 k = encoding.tolocal(key)
955 957 c = repo[k]
956 958 r = c.hex()
957 959 success = 1
958 960 except Exception as inst:
959 961 r = str(inst)
960 962 success = 0
961 963 return "%d %s\n" % (success, r)
962 964
963 965 @wireprotocommand('known', 'nodes *')
964 966 def known(repo, proto, nodes, others):
965 967 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
966 968
967 969 @wireprotocommand('pushkey', 'namespace key old new')
968 970 def pushkey(repo, proto, namespace, key, old, new):
969 971 # compatibility with pre-1.8 clients which were accidentally
970 972 # sending raw binary nodes rather than utf-8-encoded hex
971 973 if len(new) == 20 and util.escapestr(new) != new:
972 974 # looks like it could be a binary node
973 975 try:
974 976 new.decode('utf-8')
975 977 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
976 978 except UnicodeDecodeError:
977 979 pass # binary, leave unmodified
978 980 else:
979 981 new = encoding.tolocal(new) # normal path
980 982
981 983 with proto.mayberedirectstdio() as output:
982 984 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
983 985 encoding.tolocal(old), new) or False
984 986
985 987 output = output.getvalue() if output else ''
986 988 return '%s\n%s' % (int(r), output)
987 989
988 990 @wireprotocommand('stream_out')
989 991 def stream(repo, proto):
990 992 '''If the server supports streaming clone, it advertises the "stream"
991 993 capability with a value representing the version and flags of the repo
992 994 it is serving. Client checks to see if it understands the format.
993 995 '''
994 996 return streamres_legacy(streamclone.generatev1wireproto(repo))
995 997
996 998 @wireprotocommand('unbundle', 'heads')
997 999 def unbundle(repo, proto, heads):
998 1000 their_heads = decodelist(heads)
999 1001
1000 try:
1001 proto.redirect()
1002
1003 exchange.check_heads(repo, their_heads, 'preparing changes')
1004
1005 # write bundle data to temporary file because it can be big
1006 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1007 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1008 r = 0
1002 with proto.mayberedirectstdio() as output:
1009 1003 try:
1010 proto.getfile(fp)
1011 fp.seek(0)
1012 gen = exchange.readbundle(repo.ui, fp, None)
1013 if (isinstance(gen, changegroupmod.cg1unpacker)
1014 and not bundle1allowed(repo, 'push')):
1015 if proto.name == 'http':
1016 # need to special case http because stderr do not get to
1017 # the http client on failed push so we need to abuse some
1018 # other error type to make sure the message get to the
1019 # user.
1020 return ooberror(bundle2required)
1021 raise error.Abort(bundle2requiredmain,
1022 hint=bundle2requiredhint)
1004 exchange.check_heads(repo, their_heads, 'preparing changes')
1023 1005
1024 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1025 proto._client())
1026 if util.safehasattr(r, 'addpart'):
1027 # The return looks streamable, we are in the bundle2 case and
1028 # should return a stream.
1029 return streamres_legacy(gen=r.getchunks())
1030 return pushres(r)
1031
1032 finally:
1033 fp.close()
1034 os.unlink(tempname)
1035
1036 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1037 # handle non-bundle2 case first
1038 if not getattr(exc, 'duringunbundle2', False):
1006 # write bundle data to temporary file because it can be big
1007 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1008 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1009 r = 0
1039 1010 try:
1040 raise
1041 except error.Abort:
1042 # The old code we moved used util.stderr directly.
1043 # We did not change it to minimise code change.
1044 # This need to be moved to something proper.
1045 # Feel free to do it.
1046 util.stderr.write("abort: %s\n" % exc)
1047 if exc.hint is not None:
1048 util.stderr.write("(%s)\n" % exc.hint)
1049 return pushres(0)
1050 except error.PushRaced:
1051 return pusherr(str(exc))
1011 proto.getfile(fp)
1012 fp.seek(0)
1013 gen = exchange.readbundle(repo.ui, fp, None)
1014 if (isinstance(gen, changegroupmod.cg1unpacker)
1015 and not bundle1allowed(repo, 'push')):
1016 if proto.name == 'http':
1017 # need to special case http because stderr do not get to
1018 # the http client on failed push so we need to abuse
1019 # some other error type to make sure the message get to
1020 # the user.
1021 return ooberror(bundle2required)
1022 raise error.Abort(bundle2requiredmain,
1023 hint=bundle2requiredhint)
1052 1024
1053 bundler = bundle2.bundle20(repo.ui)
1054 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1055 bundler.addpart(out)
1056 try:
1057 try:
1058 raise
1059 except error.PushkeyFailed as exc:
1060 # check client caps
1061 remotecaps = getattr(exc, '_replycaps', None)
1062 if (remotecaps is not None
1063 and 'pushkey' not in remotecaps.get('error', ())):
1064 # no support remote side, fallback to Abort handler.
1025 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1026 proto._client())
1027 if util.safehasattr(r, 'addpart'):
1028 # The return looks streamable, we are in the bundle2 case
1029 # and should return a stream.
1030 return streamres_legacy(gen=r.getchunks())
1031 return pushres(r, output.getvalue() if output else '')
1032
1033 finally:
1034 fp.close()
1035 os.unlink(tempname)
1036
1037 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1038 # handle non-bundle2 case first
1039 if not getattr(exc, 'duringunbundle2', False):
1040 try:
1065 1041 raise
1066 part = bundler.newpart('error:pushkey')
1067 part.addparam('in-reply-to', exc.partid)
1068 if exc.namespace is not None:
1069 part.addparam('namespace', exc.namespace, mandatory=False)
1070 if exc.key is not None:
1071 part.addparam('key', exc.key, mandatory=False)
1072 if exc.new is not None:
1073 part.addparam('new', exc.new, mandatory=False)
1074 if exc.old is not None:
1075 part.addparam('old', exc.old, mandatory=False)
1076 if exc.ret is not None:
1077 part.addparam('ret', exc.ret, mandatory=False)
1078 except error.BundleValueError as exc:
1079 errpart = bundler.newpart('error:unsupportedcontent')
1080 if exc.parttype is not None:
1081 errpart.addparam('parttype', exc.parttype)
1082 if exc.params:
1083 errpart.addparam('params', '\0'.join(exc.params))
1084 except error.Abort as exc:
1085 manargs = [('message', str(exc))]
1086 advargs = []
1087 if exc.hint is not None:
1088 advargs.append(('hint', exc.hint))
1089 bundler.addpart(bundle2.bundlepart('error:abort',
1090 manargs, advargs))
1091 except error.PushRaced as exc:
1092 bundler.newpart('error:pushraced', [('message', str(exc))])
1093 return streamres_legacy(gen=bundler.getchunks())
1042 except error.Abort:
1043 # The old code we moved used util.stderr directly.
1044 # We did not change it to minimise code change.
1045 # This need to be moved to something proper.
1046 # Feel free to do it.
1047 util.stderr.write("abort: %s\n" % exc)
1048 if exc.hint is not None:
1049 util.stderr.write("(%s)\n" % exc.hint)
1050 return pushres(0, output.getvalue() if output else '')
1051 except error.PushRaced:
1052 return pusherr(str(exc),
1053 output.getvalue() if output else '')
1054
1055 bundler = bundle2.bundle20(repo.ui)
1056 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1057 bundler.addpart(out)
1058 try:
1059 try:
1060 raise
1061 except error.PushkeyFailed as exc:
1062 # check client caps
1063 remotecaps = getattr(exc, '_replycaps', None)
1064 if (remotecaps is not None
1065 and 'pushkey' not in remotecaps.get('error', ())):
1066 # no support remote side, fallback to Abort handler.
1067 raise
1068 part = bundler.newpart('error:pushkey')
1069 part.addparam('in-reply-to', exc.partid)
1070 if exc.namespace is not None:
1071 part.addparam('namespace', exc.namespace,
1072 mandatory=False)
1073 if exc.key is not None:
1074 part.addparam('key', exc.key, mandatory=False)
1075 if exc.new is not None:
1076 part.addparam('new', exc.new, mandatory=False)
1077 if exc.old is not None:
1078 part.addparam('old', exc.old, mandatory=False)
1079 if exc.ret is not None:
1080 part.addparam('ret', exc.ret, mandatory=False)
1081 except error.BundleValueError as exc:
1082 errpart = bundler.newpart('error:unsupportedcontent')
1083 if exc.parttype is not None:
1084 errpart.addparam('parttype', exc.parttype)
1085 if exc.params:
1086 errpart.addparam('params', '\0'.join(exc.params))
1087 except error.Abort as exc:
1088 manargs = [('message', str(exc))]
1089 advargs = []
1090 if exc.hint is not None:
1091 advargs.append(('hint', exc.hint))
1092 bundler.addpart(bundle2.bundlepart('error:abort',
1093 manargs, advargs))
1094 except error.PushRaced as exc:
1095 bundler.newpart('error:pushraced', [('message', str(exc))])
1096 return streamres_legacy(gen=bundler.getchunks())
@@ -1,481 +1,479
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import abc
10 10 import cgi
11 11 import contextlib
12 12 import struct
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 encoding,
18 18 error,
19 19 hook,
20 20 pycompat,
21 21 util,
22 22 wireproto,
23 23 )
24 24
25 25 stringio = util.stringio
26 26
27 27 urlerr = util.urlerr
28 28 urlreq = util.urlreq
29 29
30 30 HTTP_OK = 200
31 31
32 32 HGTYPE = 'application/mercurial-0.1'
33 33 HGTYPE2 = 'application/mercurial-0.2'
34 34 HGERRTYPE = 'application/hg-error'
35 35
36 36 # Names of the SSH protocol implementations.
37 37 SSHV1 = 'ssh-v1'
38 38 # This is advertised over the wire. Incremental the counter at the end
39 39 # to reflect BC breakages.
40 40 SSHV2 = 'exp-ssh-v2-0001'
41 41
42 42 class baseprotocolhandler(object):
43 43 """Abstract base class for wire protocol handlers.
44 44
45 45 A wire protocol handler serves as an interface between protocol command
46 46 handlers and the wire protocol transport layer. Protocol handlers provide
47 47 methods to read command arguments, redirect stdio for the duration of
48 48 the request, handle response types, etc.
49 49 """
50 50
51 51 __metaclass__ = abc.ABCMeta
52 52
53 53 @abc.abstractproperty
54 54 def name(self):
55 55 """The name of the protocol implementation.
56 56
57 57 Used for uniquely identifying the transport type.
58 58 """
59 59
60 60 @abc.abstractmethod
61 61 def getargs(self, args):
62 62 """return the value for arguments in <args>
63 63
64 64 returns a list of values (same order as <args>)"""
65 65
66 66 @abc.abstractmethod
67 67 def getfile(self, fp):
68 68 """write the whole content of a file into a file like object
69 69
70 70 The file is in the form::
71 71
72 72 (<chunk-size>\n<chunk>)+0\n
73 73
74 74 chunk size is the ascii version of the int.
75 75 """
76 76
77 77 @abc.abstractmethod
78 78 def mayberedirectstdio(self):
79 79 """Context manager to possibly redirect stdio.
80 80
81 81 The context manager yields a file-object like object that receives
82 82 stdout and stderr output when the context manager is active. Or it
83 83 yields ``None`` if no I/O redirection occurs.
84 84
85 85 The intent of this context manager is to capture stdio output
86 86 so it may be sent in the response. Some transports support streaming
87 87 stdio to the client in real time. For these transports, stdio output
88 88 won't be captured.
89 89 """
90 90
91 91 @abc.abstractmethod
92 92 def redirect(self):
93 93 """may setup interception for stdout and stderr
94 94
95 95 See also the `restore` method."""
96 96
97 97 # If the `redirect` function does install interception, the `restore`
98 98 # function MUST be defined. If interception is not used, this function
99 99 # MUST NOT be defined.
100 100 #
101 101 # left commented here on purpose
102 102 #
103 103 #def restore(self):
104 104 # """reinstall previous stdout and stderr and return intercepted stdout
105 105 # """
106 106 # raise NotImplementedError()
107 107
108 108 def decodevaluefromheaders(req, headerprefix):
109 109 """Decode a long value from multiple HTTP request headers.
110 110
111 111 Returns the value as a bytes, not a str.
112 112 """
113 113 chunks = []
114 114 i = 1
115 115 prefix = headerprefix.upper().replace(r'-', r'_')
116 116 while True:
117 117 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
118 118 if v is None:
119 119 break
120 120 chunks.append(pycompat.bytesurl(v))
121 121 i += 1
122 122
123 123 return ''.join(chunks)
124 124
125 125 class webproto(baseprotocolhandler):
126 126 def __init__(self, req, ui):
127 127 self._req = req
128 128 self._ui = ui
129 129
130 130 @property
131 131 def name(self):
132 132 return 'http'
133 133
134 134 def getargs(self, args):
135 135 knownargs = self._args()
136 136 data = {}
137 137 keys = args.split()
138 138 for k in keys:
139 139 if k == '*':
140 140 star = {}
141 141 for key in knownargs.keys():
142 142 if key != 'cmd' and key not in keys:
143 143 star[key] = knownargs[key][0]
144 144 data['*'] = star
145 145 else:
146 146 data[k] = knownargs[k][0]
147 147 return [data[k] for k in keys]
148 148
149 149 def _args(self):
150 150 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
151 151 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
152 152 if postlen:
153 153 args.update(cgi.parse_qs(
154 154 self._req.read(postlen), keep_blank_values=True))
155 155 return args
156 156
157 157 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
158 158 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
159 159 return args
160 160
161 161 def getfile(self, fp):
162 162 length = int(self._req.env[r'CONTENT_LENGTH'])
163 163 # If httppostargs is used, we need to read Content-Length
164 164 # minus the amount that was consumed by args.
165 165 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
166 166 for s in util.filechunkiter(self._req, limit=length):
167 167 fp.write(s)
168 168
169 169 @contextlib.contextmanager
170 170 def mayberedirectstdio(self):
171 171 oldout = self._ui.fout
172 172 olderr = self._ui.ferr
173 173
174 174 out = util.stringio()
175 175
176 176 try:
177 177 self._ui.fout = out
178 178 self._ui.ferr = out
179 179 yield out
180 180 finally:
181 181 self._ui.fout = oldout
182 182 self._ui.ferr = olderr
183 183
184 184 def redirect(self):
185 185 self._oldio = self._ui.fout, self._ui.ferr
186 186 self._ui.ferr = self._ui.fout = stringio()
187 187
188 188 def restore(self):
189 189 val = self._ui.fout.getvalue()
190 190 self._ui.ferr, self._ui.fout = self._oldio
191 191 return val
192 192
193 193 def _client(self):
194 194 return 'remote:%s:%s:%s' % (
195 195 self._req.env.get('wsgi.url_scheme') or 'http',
196 196 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
197 197 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
198 198
199 199 def responsetype(self, prefer_uncompressed):
200 200 """Determine the appropriate response type and compression settings.
201 201
202 202 Returns a tuple of (mediatype, compengine, engineopts).
203 203 """
204 204 # Determine the response media type and compression engine based
205 205 # on the request parameters.
206 206 protocaps = decodevaluefromheaders(self._req, r'X-HgProto').split(' ')
207 207
208 208 if '0.2' in protocaps:
209 209 # All clients are expected to support uncompressed data.
210 210 if prefer_uncompressed:
211 211 return HGTYPE2, util._noopengine(), {}
212 212
213 213 # Default as defined by wire protocol spec.
214 214 compformats = ['zlib', 'none']
215 215 for cap in protocaps:
216 216 if cap.startswith('comp='):
217 217 compformats = cap[5:].split(',')
218 218 break
219 219
220 220 # Now find an agreed upon compression format.
221 221 for engine in wireproto.supportedcompengines(self._ui, self,
222 222 util.SERVERROLE):
223 223 if engine.wireprotosupport().name in compformats:
224 224 opts = {}
225 225 level = self._ui.configint('server',
226 226 '%slevel' % engine.name())
227 227 if level is not None:
228 228 opts['level'] = level
229 229
230 230 return HGTYPE2, engine, opts
231 231
232 232 # No mutually supported compression format. Fall back to the
233 233 # legacy protocol.
234 234
235 235 # Don't allow untrusted settings because disabling compression or
236 236 # setting a very high compression level could lead to flooding
237 237 # the server's network or CPU.
238 238 opts = {'level': self._ui.configint('server', 'zliblevel')}
239 239 return HGTYPE, util.compengines['zlib'], opts
240 240
241 241 def iscmd(cmd):
242 242 return cmd in wireproto.commands
243 243
244 244 def parsehttprequest(repo, req, query):
245 245 """Parse the HTTP request for a wire protocol request.
246 246
247 247 If the current request appears to be a wire protocol request, this
248 248 function returns a dict with details about that request, including
249 249 an ``abstractprotocolserver`` instance suitable for handling the
250 250 request. Otherwise, ``None`` is returned.
251 251
252 252 ``req`` is a ``wsgirequest`` instance.
253 253 """
254 254 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
255 255 # string parameter. If it isn't present, this isn't a wire protocol
256 256 # request.
257 257 if r'cmd' not in req.form:
258 258 return None
259 259
260 260 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
261 261
262 262 # The "cmd" request parameter is used by both the wire protocol and hgweb.
263 263 # While not all wire protocol commands are available for all transports,
264 264 # if we see a "cmd" value that resembles a known wire protocol command, we
265 265 # route it to a protocol handler. This is better than routing possible
266 266 # wire protocol requests to hgweb because it prevents hgweb from using
267 267 # known wire protocol commands and it is less confusing for machine
268 268 # clients.
269 269 if cmd not in wireproto.commands:
270 270 return None
271 271
272 272 proto = webproto(req, repo.ui)
273 273
274 274 return {
275 275 'cmd': cmd,
276 276 'proto': proto,
277 277 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
278 278 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
279 279 }
280 280
281 281 def _callhttp(repo, req, proto, cmd):
282 282 def genversion2(gen, engine, engineopts):
283 283 # application/mercurial-0.2 always sends a payload header
284 284 # identifying the compression engine.
285 285 name = engine.wireprotosupport().name
286 286 assert 0 < len(name) < 256
287 287 yield struct.pack('B', len(name))
288 288 yield name
289 289
290 290 for chunk in gen:
291 291 yield chunk
292 292
293 293 rsp = wireproto.dispatch(repo, proto, cmd)
294 294
295 295 if not wireproto.commands.commandavailable(cmd, proto):
296 296 req.respond(HTTP_OK, HGERRTYPE,
297 297 body=_('requested wire protocol command is not available '
298 298 'over HTTP'))
299 299 return []
300 300
301 301 if isinstance(rsp, bytes):
302 302 req.respond(HTTP_OK, HGTYPE, body=rsp)
303 303 return []
304 304 elif isinstance(rsp, wireproto.streamres_legacy):
305 305 gen = rsp.gen
306 306 req.respond(HTTP_OK, HGTYPE)
307 307 return gen
308 308 elif isinstance(rsp, wireproto.streamres):
309 309 gen = rsp.gen
310 310
311 311 # This code for compression should not be streamres specific. It
312 312 # is here because we only compress streamres at the moment.
313 313 mediatype, engine, engineopts = proto.responsetype(
314 314 rsp.prefer_uncompressed)
315 315 gen = engine.compressstream(gen, engineopts)
316 316
317 317 if mediatype == HGTYPE2:
318 318 gen = genversion2(gen, engine, engineopts)
319 319
320 320 req.respond(HTTP_OK, mediatype)
321 321 return gen
322 322 elif isinstance(rsp, wireproto.pushres):
323 val = proto.restore()
324 rsp = '%d\n%s' % (rsp.res, val)
323 rsp = '%d\n%s' % (rsp.res, rsp.output)
325 324 req.respond(HTTP_OK, HGTYPE, body=rsp)
326 325 return []
327 326 elif isinstance(rsp, wireproto.pusherr):
328 327 # This is the httplib workaround documented in _handlehttperror().
329 328 req.drain()
330 329
331 proto.restore()
332 330 rsp = '0\n%s\n' % rsp.res
333 331 req.respond(HTTP_OK, HGTYPE, body=rsp)
334 332 return []
335 333 elif isinstance(rsp, wireproto.ooberror):
336 334 rsp = rsp.message
337 335 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
338 336 return []
339 337 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
340 338
341 339 def _handlehttperror(e, req, cmd):
342 340 """Called when an ErrorResponse is raised during HTTP request processing."""
343 341
344 342 # Clients using Python's httplib are stateful: the HTTP client
345 343 # won't process an HTTP response until all request data is
346 344 # sent to the server. The intent of this code is to ensure
347 345 # we always read HTTP request data from the client, thus
348 346 # ensuring httplib transitions to a state that allows it to read
349 347 # the HTTP response. In other words, it helps prevent deadlocks
350 348 # on clients using httplib.
351 349
352 350 if (req.env[r'REQUEST_METHOD'] == r'POST' and
353 351 # But not if Expect: 100-continue is being used.
354 352 (req.env.get('HTTP_EXPECT',
355 353 '').lower() != '100-continue') or
356 354 # Or the non-httplib HTTP library is being advertised by
357 355 # the client.
358 356 req.env.get('X-HgHttp2', '')):
359 357 req.drain()
360 358 else:
361 359 req.headers.append((r'Connection', r'Close'))
362 360
363 361 # TODO This response body assumes the failed command was
364 362 # "unbundle." That assumption is not always valid.
365 363 req.respond(e, HGTYPE, body='0\n%s\n' % e)
366 364
367 365 return ''
368 366
369 367 def _sshv1respondbytes(fout, value):
370 368 """Send a bytes response for protocol version 1."""
371 369 fout.write('%d\n' % len(value))
372 370 fout.write(value)
373 371 fout.flush()
374 372
375 373 def _sshv1respondstream(fout, source):
376 374 write = fout.write
377 375 for chunk in source.gen:
378 376 write(chunk)
379 377 fout.flush()
380 378
381 379 def _sshv1respondooberror(fout, ferr, rsp):
382 380 ferr.write(b'%s\n-\n' % rsp)
383 381 ferr.flush()
384 382 fout.write(b'\n')
385 383 fout.flush()
386 384
387 385 class sshv1protocolhandler(baseprotocolhandler):
388 386 """Handler for requests services via version 1 of SSH protocol."""
389 387 def __init__(self, ui, fin, fout):
390 388 self._ui = ui
391 389 self._fin = fin
392 390 self._fout = fout
393 391
394 392 @property
395 393 def name(self):
396 394 return 'ssh'
397 395
398 396 def getargs(self, args):
399 397 data = {}
400 398 keys = args.split()
401 399 for n in xrange(len(keys)):
402 400 argline = self._fin.readline()[:-1]
403 401 arg, l = argline.split()
404 402 if arg not in keys:
405 403 raise error.Abort(_("unexpected parameter %r") % arg)
406 404 if arg == '*':
407 405 star = {}
408 406 for k in xrange(int(l)):
409 407 argline = self._fin.readline()[:-1]
410 408 arg, l = argline.split()
411 409 val = self._fin.read(int(l))
412 410 star[arg] = val
413 411 data['*'] = star
414 412 else:
415 413 val = self._fin.read(int(l))
416 414 data[arg] = val
417 415 return [data[k] for k in keys]
418 416
419 417 def getfile(self, fpout):
420 418 _sshv1respondbytes(self._fout, b'')
421 419 count = int(self._fin.readline())
422 420 while count:
423 421 fpout.write(self._fin.read(count))
424 422 count = int(self._fin.readline())
425 423
426 424 @contextlib.contextmanager
427 425 def mayberedirectstdio(self):
428 426 yield None
429 427
430 428 def redirect(self):
431 429 pass
432 430
433 431 def _client(self):
434 432 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
435 433 return 'remote:ssh:' + client
436 434
437 435 class sshserver(object):
438 436 def __init__(self, ui, repo):
439 437 self._ui = ui
440 438 self._repo = repo
441 439 self._fin = ui.fin
442 440 self._fout = ui.fout
443 441
444 442 hook.redirect(True)
445 443 ui.fout = repo.ui.fout = ui.ferr
446 444
447 445 # Prevent insertion/deletion of CRs
448 446 util.setbinary(self._fin)
449 447 util.setbinary(self._fout)
450 448
451 449 self._proto = sshv1protocolhandler(self._ui, self._fin, self._fout)
452 450
453 451 def serve_forever(self):
454 452 while self.serve_one():
455 453 pass
456 454 sys.exit(0)
457 455
458 456 def serve_one(self):
459 457 cmd = self._fin.readline()[:-1]
460 458 if cmd and wireproto.commands.commandavailable(cmd, self._proto):
461 459 rsp = wireproto.dispatch(self._repo, self._proto, cmd)
462 460
463 461 if isinstance(rsp, bytes):
464 462 _sshv1respondbytes(self._fout, rsp)
465 463 elif isinstance(rsp, wireproto.streamres):
466 464 _sshv1respondstream(self._fout, rsp)
467 465 elif isinstance(rsp, wireproto.streamres_legacy):
468 466 _sshv1respondstream(self._fout, rsp)
469 467 elif isinstance(rsp, wireproto.pushres):
470 468 _sshv1respondbytes(self._fout, b'')
471 469 _sshv1respondbytes(self._fout, bytes(rsp.res))
472 470 elif isinstance(rsp, wireproto.pusherr):
473 471 _sshv1respondbytes(self._fout, rsp.res)
474 472 elif isinstance(rsp, wireproto.ooberror):
475 473 _sshv1respondooberror(self._fout, self._ui.ferr, rsp.message)
476 474 else:
477 475 raise error.ProgrammingError('unhandled response type from '
478 476 'wire protocol command: %s' % rsp)
479 477 elif cmd:
480 478 _sshv1respondbytes(self._fout, b'')
481 479 return cmd != ''
General Comments 0
You need to be logged in to leave comments. Login now