##// END OF EJS Templates
wireprotoserver: rename getfile() to forwardpayload() (API)...
Gregory Szorc -
r36087:90ca4986 default
parent child Browse files
Show More
@@ -1,189 +1,189 b''
1 1 # Copyright 2011 Fog Creek Software
2 2 #
3 3 # This software may be used and distributed according to the terms of the
4 4 # GNU General Public License version 2 or any later version.
5 5 from __future__ import absolute_import
6 6
7 7 import os
8 8 import re
9 9
10 10 from mercurial.i18n import _
11 11
12 12 from mercurial import (
13 13 error,
14 14 httppeer,
15 15 util,
16 16 wireproto,
17 17 )
18 18
19 19 from . import (
20 20 lfutil,
21 21 )
22 22
23 23 urlerr = util.urlerr
24 24 urlreq = util.urlreq
25 25
26 26 LARGEFILES_REQUIRED_MSG = ('\nThis repository uses the largefiles extension.'
27 27 '\n\nPlease enable it in your Mercurial config '
28 28 'file.\n')
29 29
30 30 # these will all be replaced by largefiles.uisetup
31 31 ssholdcallstream = None
32 32 httpoldcallstream = None
33 33
34 34 def putlfile(repo, proto, sha):
35 35 '''Server command for putting a largefile into a repository's local store
36 36 and into the user cache.'''
37 37 with proto.mayberedirectstdio() as output:
38 38 path = lfutil.storepath(repo, sha)
39 39 util.makedirs(os.path.dirname(path))
40 40 tmpfp = util.atomictempfile(path, createmode=repo.store.createmode)
41 41
42 42 try:
43 proto.getfile(tmpfp)
43 proto.forwardpayload(tmpfp)
44 44 tmpfp._fp.seek(0)
45 45 if sha != lfutil.hexsha1(tmpfp._fp):
46 46 raise IOError(0, _('largefile contents do not match hash'))
47 47 tmpfp.close()
48 48 lfutil.linktousercache(repo, sha)
49 49 except IOError as e:
50 50 repo.ui.warn(_('largefiles: failed to put %s into store: %s\n') %
51 51 (sha, e.strerror))
52 52 return wireproto.pushres(1, output.getvalue() if output else '')
53 53 finally:
54 54 tmpfp.discard()
55 55
56 56 return wireproto.pushres(0, output.getvalue() if output else '')
57 57
58 58 def getlfile(repo, proto, sha):
59 59 '''Server command for retrieving a largefile from the repository-local
60 60 cache or user cache.'''
61 61 filename = lfutil.findfile(repo, sha)
62 62 if not filename:
63 63 raise error.Abort(_('requested largefile %s not present in cache')
64 64 % sha)
65 65 f = open(filename, 'rb')
66 66 length = os.fstat(f.fileno())[6]
67 67
68 68 # Since we can't set an HTTP content-length header here, and
69 69 # Mercurial core provides no way to give the length of a streamres
70 70 # (and reading the entire file into RAM would be ill-advised), we
71 71 # just send the length on the first line of the response, like the
72 72 # ssh proto does for string responses.
73 73 def generator():
74 74 yield '%d\n' % length
75 75 for chunk in util.filechunkiter(f):
76 76 yield chunk
77 77 return wireproto.streamres_legacy(gen=generator())
78 78
79 79 def statlfile(repo, proto, sha):
80 80 '''Server command for checking if a largefile is present - returns '2\n' if
81 81 the largefile is missing, '0\n' if it seems to be in good condition.
82 82
83 83 The value 1 is reserved for mismatched checksum, but that is too expensive
84 84 to be verified on every stat and must be caught be running 'hg verify'
85 85 server side.'''
86 86 filename = lfutil.findfile(repo, sha)
87 87 if not filename:
88 88 return '2\n'
89 89 return '0\n'
90 90
91 91 def wirereposetup(ui, repo):
92 92 class lfileswirerepository(repo.__class__):
93 93 def putlfile(self, sha, fd):
94 94 # unfortunately, httprepository._callpush tries to convert its
95 95 # input file-like into a bundle before sending it, so we can't use
96 96 # it ...
97 97 if issubclass(self.__class__, httppeer.httppeer):
98 98 res = self._call('putlfile', data=fd, sha=sha,
99 99 headers={'content-type':'application/mercurial-0.1'})
100 100 try:
101 101 d, output = res.split('\n', 1)
102 102 for l in output.splitlines(True):
103 103 self.ui.warn(_('remote: '), l) # assume l ends with \n
104 104 return int(d)
105 105 except ValueError:
106 106 self.ui.warn(_('unexpected putlfile response: %r\n') % res)
107 107 return 1
108 108 # ... but we can't use sshrepository._call because the data=
109 109 # argument won't get sent, and _callpush does exactly what we want
110 110 # in this case: send the data straight through
111 111 else:
112 112 try:
113 113 ret, output = self._callpush("putlfile", fd, sha=sha)
114 114 if ret == "":
115 115 raise error.ResponseError(_('putlfile failed:'),
116 116 output)
117 117 return int(ret)
118 118 except IOError:
119 119 return 1
120 120 except ValueError:
121 121 raise error.ResponseError(
122 122 _('putlfile failed (unexpected response):'), ret)
123 123
124 124 def getlfile(self, sha):
125 125 """returns an iterable with the chunks of the file with sha sha"""
126 126 stream = self._callstream("getlfile", sha=sha)
127 127 length = stream.readline()
128 128 try:
129 129 length = int(length)
130 130 except ValueError:
131 131 self._abort(error.ResponseError(_("unexpected response:"),
132 132 length))
133 133
134 134 # SSH streams will block if reading more than length
135 135 for chunk in util.filechunkiter(stream, limit=length):
136 136 yield chunk
137 137 # HTTP streams must hit the end to process the last empty
138 138 # chunk of Chunked-Encoding so the connection can be reused.
139 139 if issubclass(self.__class__, httppeer.httppeer):
140 140 chunk = stream.read(1)
141 141 if chunk:
142 142 self._abort(error.ResponseError(_("unexpected response:"),
143 143 chunk))
144 144
145 145 @wireproto.batchable
146 146 def statlfile(self, sha):
147 147 f = wireproto.future()
148 148 result = {'sha': sha}
149 149 yield result, f
150 150 try:
151 151 yield int(f.value)
152 152 except (ValueError, urlerr.httperror):
153 153 # If the server returns anything but an integer followed by a
154 154 # newline, newline, it's not speaking our language; if we get
155 155 # an HTTP error, we can't be sure the largefile is present;
156 156 # either way, consider it missing.
157 157 yield 2
158 158
159 159 repo.__class__ = lfileswirerepository
160 160
161 161 # advertise the largefiles=serve capability
162 162 def _capabilities(orig, repo, proto):
163 163 '''announce largefile server capability'''
164 164 caps = orig(repo, proto)
165 165 caps.append('largefiles=serve')
166 166 return caps
167 167
168 168 def heads(repo, proto):
169 169 '''Wrap server command - largefile capable clients will know to call
170 170 lheads instead'''
171 171 if lfutil.islfilesrepo(repo):
172 172 return wireproto.ooberror(LARGEFILES_REQUIRED_MSG)
173 173 return wireproto.heads(repo, proto)
174 174
175 175 def sshrepocallstream(self, cmd, **args):
176 176 if cmd == 'heads' and self.capable('largefiles'):
177 177 cmd = 'lheads'
178 178 if cmd == 'batch' and self.capable('largefiles'):
179 179 args[r'cmds'] = args[r'cmds'].replace('heads ', 'lheads ')
180 180 return ssholdcallstream(self, cmd, **args)
181 181
182 182 headsre = re.compile(r'(^|;)heads\b')
183 183
184 184 def httprepocallstream(self, cmd, **args):
185 185 if cmd == 'heads' and self.capable('largefiles'):
186 186 cmd = 'lheads'
187 187 if cmd == 'batch' and self.capable('largefiles'):
188 188 args[r'cmds'] = headsre.sub('lheads', args[r'cmds'])
189 189 return httpoldcallstream(self, cmd, **args)
@@ -1,1096 +1,1096 b''
1 1 # wireproto.py - generic wire protocol support functions
2 2 #
3 3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 4 #
5 5 # This software may be used and distributed according to the terms of the
6 6 # GNU General Public License version 2 or any later version.
7 7
8 8 from __future__ import absolute_import
9 9
10 10 import hashlib
11 11 import os
12 12 import tempfile
13 13
14 14 from .i18n import _
15 15 from .node import (
16 16 bin,
17 17 hex,
18 18 nullid,
19 19 )
20 20
21 21 from . import (
22 22 bundle2,
23 23 changegroup as changegroupmod,
24 24 discovery,
25 25 encoding,
26 26 error,
27 27 exchange,
28 28 peer,
29 29 pushkey as pushkeymod,
30 30 pycompat,
31 31 repository,
32 32 streamclone,
33 33 util,
34 34 )
35 35
36 36 urlerr = util.urlerr
37 37 urlreq = util.urlreq
38 38
39 39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 41 'IncompatibleClient')
42 42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43 43
44 44 class remoteiterbatcher(peer.iterbatcher):
45 45 def __init__(self, remote):
46 46 super(remoteiterbatcher, self).__init__()
47 47 self._remote = remote
48 48
49 49 def __getattr__(self, name):
50 50 # Validate this method is batchable, since submit() only supports
51 51 # batchable methods.
52 52 fn = getattr(self._remote, name)
53 53 if not getattr(fn, 'batchable', None):
54 54 raise error.ProgrammingError('Attempted to batch a non-batchable '
55 55 'call to %r' % name)
56 56
57 57 return super(remoteiterbatcher, self).__getattr__(name)
58 58
59 59 def submit(self):
60 60 """Break the batch request into many patch calls and pipeline them.
61 61
62 62 This is mostly valuable over http where request sizes can be
63 63 limited, but can be used in other places as well.
64 64 """
65 65 # 2-tuple of (command, arguments) that represents what will be
66 66 # sent over the wire.
67 67 requests = []
68 68
69 69 # 4-tuple of (command, final future, @batchable generator, remote
70 70 # future).
71 71 results = []
72 72
73 73 for command, args, opts, finalfuture in self.calls:
74 74 mtd = getattr(self._remote, command)
75 75 batchable = mtd.batchable(mtd.__self__, *args, **opts)
76 76
77 77 commandargs, fremote = next(batchable)
78 78 assert fremote
79 79 requests.append((command, commandargs))
80 80 results.append((command, finalfuture, batchable, fremote))
81 81
82 82 if requests:
83 83 self._resultiter = self._remote._submitbatch(requests)
84 84
85 85 self._results = results
86 86
87 87 def results(self):
88 88 for command, finalfuture, batchable, remotefuture in self._results:
89 89 # Get the raw result, set it in the remote future, feed it
90 90 # back into the @batchable generator so it can be decoded, and
91 91 # set the result on the final future to this value.
92 92 remoteresult = next(self._resultiter)
93 93 remotefuture.set(remoteresult)
94 94 finalfuture.set(next(batchable))
95 95
96 96 # Verify our @batchable generators only emit 2 values.
97 97 try:
98 98 next(batchable)
99 99 except StopIteration:
100 100 pass
101 101 else:
102 102 raise error.ProgrammingError('%s @batchable generator emitted '
103 103 'unexpected value count' % command)
104 104
105 105 yield finalfuture.value
106 106
107 107 # Forward a couple of names from peer to make wireproto interactions
108 108 # slightly more sensible.
109 109 batchable = peer.batchable
110 110 future = peer.future
111 111
112 112 # list of nodes encoding / decoding
113 113
114 114 def decodelist(l, sep=' '):
115 115 if l:
116 116 return [bin(v) for v in l.split(sep)]
117 117 return []
118 118
119 119 def encodelist(l, sep=' '):
120 120 try:
121 121 return sep.join(map(hex, l))
122 122 except TypeError:
123 123 raise
124 124
125 125 # batched call argument encoding
126 126
127 127 def escapearg(plain):
128 128 return (plain
129 129 .replace(':', ':c')
130 130 .replace(',', ':o')
131 131 .replace(';', ':s')
132 132 .replace('=', ':e'))
133 133
134 134 def unescapearg(escaped):
135 135 return (escaped
136 136 .replace(':e', '=')
137 137 .replace(':s', ';')
138 138 .replace(':o', ',')
139 139 .replace(':c', ':'))
140 140
141 141 def encodebatchcmds(req):
142 142 """Return a ``cmds`` argument value for the ``batch`` command."""
143 143 cmds = []
144 144 for op, argsdict in req:
145 145 # Old servers didn't properly unescape argument names. So prevent
146 146 # the sending of argument names that may not be decoded properly by
147 147 # servers.
148 148 assert all(escapearg(k) == k for k in argsdict)
149 149
150 150 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
151 151 for k, v in argsdict.iteritems())
152 152 cmds.append('%s %s' % (op, args))
153 153
154 154 return ';'.join(cmds)
155 155
156 156 # mapping of options accepted by getbundle and their types
157 157 #
158 158 # Meant to be extended by extensions. It is extensions responsibility to ensure
159 159 # such options are properly processed in exchange.getbundle.
160 160 #
161 161 # supported types are:
162 162 #
163 163 # :nodes: list of binary nodes
164 164 # :csv: list of comma-separated values
165 165 # :scsv: list of comma-separated values return as set
166 166 # :plain: string with no transformation needed.
167 167 gboptsmap = {'heads': 'nodes',
168 168 'bookmarks': 'boolean',
169 169 'common': 'nodes',
170 170 'obsmarkers': 'boolean',
171 171 'phases': 'boolean',
172 172 'bundlecaps': 'scsv',
173 173 'listkeys': 'csv',
174 174 'cg': 'boolean',
175 175 'cbattempted': 'boolean',
176 176 'stream': 'boolean',
177 177 }
178 178
179 179 # client side
180 180
181 181 class wirepeer(repository.legacypeer):
182 182 """Client-side interface for communicating with a peer repository.
183 183
184 184 Methods commonly call wire protocol commands of the same name.
185 185
186 186 See also httppeer.py and sshpeer.py for protocol-specific
187 187 implementations of this interface.
188 188 """
189 189 # Begin of basewirepeer interface.
190 190
191 191 def iterbatch(self):
192 192 return remoteiterbatcher(self)
193 193
194 194 @batchable
195 195 def lookup(self, key):
196 196 self.requirecap('lookup', _('look up remote revision'))
197 197 f = future()
198 198 yield {'key': encoding.fromlocal(key)}, f
199 199 d = f.value
200 200 success, data = d[:-1].split(" ", 1)
201 201 if int(success):
202 202 yield bin(data)
203 203 else:
204 204 self._abort(error.RepoError(data))
205 205
206 206 @batchable
207 207 def heads(self):
208 208 f = future()
209 209 yield {}, f
210 210 d = f.value
211 211 try:
212 212 yield decodelist(d[:-1])
213 213 except ValueError:
214 214 self._abort(error.ResponseError(_("unexpected response:"), d))
215 215
216 216 @batchable
217 217 def known(self, nodes):
218 218 f = future()
219 219 yield {'nodes': encodelist(nodes)}, f
220 220 d = f.value
221 221 try:
222 222 yield [bool(int(b)) for b in d]
223 223 except ValueError:
224 224 self._abort(error.ResponseError(_("unexpected response:"), d))
225 225
226 226 @batchable
227 227 def branchmap(self):
228 228 f = future()
229 229 yield {}, f
230 230 d = f.value
231 231 try:
232 232 branchmap = {}
233 233 for branchpart in d.splitlines():
234 234 branchname, branchheads = branchpart.split(' ', 1)
235 235 branchname = encoding.tolocal(urlreq.unquote(branchname))
236 236 branchheads = decodelist(branchheads)
237 237 branchmap[branchname] = branchheads
238 238 yield branchmap
239 239 except TypeError:
240 240 self._abort(error.ResponseError(_("unexpected response:"), d))
241 241
242 242 @batchable
243 243 def listkeys(self, namespace):
244 244 if not self.capable('pushkey'):
245 245 yield {}, None
246 246 f = future()
247 247 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
248 248 yield {'namespace': encoding.fromlocal(namespace)}, f
249 249 d = f.value
250 250 self.ui.debug('received listkey for "%s": %i bytes\n'
251 251 % (namespace, len(d)))
252 252 yield pushkeymod.decodekeys(d)
253 253
254 254 @batchable
255 255 def pushkey(self, namespace, key, old, new):
256 256 if not self.capable('pushkey'):
257 257 yield False, None
258 258 f = future()
259 259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
260 260 yield {'namespace': encoding.fromlocal(namespace),
261 261 'key': encoding.fromlocal(key),
262 262 'old': encoding.fromlocal(old),
263 263 'new': encoding.fromlocal(new)}, f
264 264 d = f.value
265 265 d, output = d.split('\n', 1)
266 266 try:
267 267 d = bool(int(d))
268 268 except ValueError:
269 269 raise error.ResponseError(
270 270 _('push failed (unexpected response):'), d)
271 271 for l in output.splitlines(True):
272 272 self.ui.status(_('remote: '), l)
273 273 yield d
274 274
275 275 def stream_out(self):
276 276 return self._callstream('stream_out')
277 277
278 278 def getbundle(self, source, **kwargs):
279 279 kwargs = pycompat.byteskwargs(kwargs)
280 280 self.requirecap('getbundle', _('look up remote changes'))
281 281 opts = {}
282 282 bundlecaps = kwargs.get('bundlecaps')
283 283 if bundlecaps is not None:
284 284 kwargs['bundlecaps'] = sorted(bundlecaps)
285 285 else:
286 286 bundlecaps = () # kwargs could have it to None
287 287 for key, value in kwargs.iteritems():
288 288 if value is None:
289 289 continue
290 290 keytype = gboptsmap.get(key)
291 291 if keytype is None:
292 292 raise error.ProgrammingError(
293 293 'Unexpectedly None keytype for key %s' % key)
294 294 elif keytype == 'nodes':
295 295 value = encodelist(value)
296 296 elif keytype in ('csv', 'scsv'):
297 297 value = ','.join(value)
298 298 elif keytype == 'boolean':
299 299 value = '%i' % bool(value)
300 300 elif keytype != 'plain':
301 301 raise KeyError('unknown getbundle option type %s'
302 302 % keytype)
303 303 opts[key] = value
304 304 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
305 305 if any((cap.startswith('HG2') for cap in bundlecaps)):
306 306 return bundle2.getunbundler(self.ui, f)
307 307 else:
308 308 return changegroupmod.cg1unpacker(f, 'UN')
309 309
310 310 def unbundle(self, cg, heads, url):
311 311 '''Send cg (a readable file-like object representing the
312 312 changegroup to push, typically a chunkbuffer object) to the
313 313 remote server as a bundle.
314 314
315 315 When pushing a bundle10 stream, return an integer indicating the
316 316 result of the push (see changegroup.apply()).
317 317
318 318 When pushing a bundle20 stream, return a bundle20 stream.
319 319
320 320 `url` is the url the client thinks it's pushing to, which is
321 321 visible to hooks.
322 322 '''
323 323
324 324 if heads != ['force'] and self.capable('unbundlehash'):
325 325 heads = encodelist(['hashed',
326 326 hashlib.sha1(''.join(sorted(heads))).digest()])
327 327 else:
328 328 heads = encodelist(heads)
329 329
330 330 if util.safehasattr(cg, 'deltaheader'):
331 331 # this a bundle10, do the old style call sequence
332 332 ret, output = self._callpush("unbundle", cg, heads=heads)
333 333 if ret == "":
334 334 raise error.ResponseError(
335 335 _('push failed:'), output)
336 336 try:
337 337 ret = int(ret)
338 338 except ValueError:
339 339 raise error.ResponseError(
340 340 _('push failed (unexpected response):'), ret)
341 341
342 342 for l in output.splitlines(True):
343 343 self.ui.status(_('remote: '), l)
344 344 else:
345 345 # bundle2 push. Send a stream, fetch a stream.
346 346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
347 347 ret = bundle2.getunbundler(self.ui, stream)
348 348 return ret
349 349
350 350 # End of basewirepeer interface.
351 351
352 352 # Begin of baselegacywirepeer interface.
353 353
354 354 def branches(self, nodes):
355 355 n = encodelist(nodes)
356 356 d = self._call("branches", nodes=n)
357 357 try:
358 358 br = [tuple(decodelist(b)) for b in d.splitlines()]
359 359 return br
360 360 except ValueError:
361 361 self._abort(error.ResponseError(_("unexpected response:"), d))
362 362
363 363 def between(self, pairs):
364 364 batch = 8 # avoid giant requests
365 365 r = []
366 366 for i in xrange(0, len(pairs), batch):
367 367 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
368 368 d = self._call("between", pairs=n)
369 369 try:
370 370 r.extend(l and decodelist(l) or [] for l in d.splitlines())
371 371 except ValueError:
372 372 self._abort(error.ResponseError(_("unexpected response:"), d))
373 373 return r
374 374
375 375 def changegroup(self, nodes, kind):
376 376 n = encodelist(nodes)
377 377 f = self._callcompressable("changegroup", roots=n)
378 378 return changegroupmod.cg1unpacker(f, 'UN')
379 379
380 380 def changegroupsubset(self, bases, heads, kind):
381 381 self.requirecap('changegroupsubset', _('look up remote changes'))
382 382 bases = encodelist(bases)
383 383 heads = encodelist(heads)
384 384 f = self._callcompressable("changegroupsubset",
385 385 bases=bases, heads=heads)
386 386 return changegroupmod.cg1unpacker(f, 'UN')
387 387
388 388 # End of baselegacywirepeer interface.
389 389
390 390 def _submitbatch(self, req):
391 391 """run batch request <req> on the server
392 392
393 393 Returns an iterator of the raw responses from the server.
394 394 """
395 395 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
396 396 chunk = rsp.read(1024)
397 397 work = [chunk]
398 398 while chunk:
399 399 while ';' not in chunk and chunk:
400 400 chunk = rsp.read(1024)
401 401 work.append(chunk)
402 402 merged = ''.join(work)
403 403 while ';' in merged:
404 404 one, merged = merged.split(';', 1)
405 405 yield unescapearg(one)
406 406 chunk = rsp.read(1024)
407 407 work = [merged, chunk]
408 408 yield unescapearg(''.join(work))
409 409
410 410 def _submitone(self, op, args):
411 411 return self._call(op, **pycompat.strkwargs(args))
412 412
413 413 def debugwireargs(self, one, two, three=None, four=None, five=None):
414 414 # don't pass optional arguments left at their default value
415 415 opts = {}
416 416 if three is not None:
417 417 opts[r'three'] = three
418 418 if four is not None:
419 419 opts[r'four'] = four
420 420 return self._call('debugwireargs', one=one, two=two, **opts)
421 421
422 422 def _call(self, cmd, **args):
423 423 """execute <cmd> on the server
424 424
425 425 The command is expected to return a simple string.
426 426
427 427 returns the server reply as a string."""
428 428 raise NotImplementedError()
429 429
430 430 def _callstream(self, cmd, **args):
431 431 """execute <cmd> on the server
432 432
433 433 The command is expected to return a stream. Note that if the
434 434 command doesn't return a stream, _callstream behaves
435 435 differently for ssh and http peers.
436 436
437 437 returns the server reply as a file like object.
438 438 """
439 439 raise NotImplementedError()
440 440
441 441 def _callcompressable(self, cmd, **args):
442 442 """execute <cmd> on the server
443 443
444 444 The command is expected to return a stream.
445 445
446 446 The stream may have been compressed in some implementations. This
447 447 function takes care of the decompression. This is the only difference
448 448 with _callstream.
449 449
450 450 returns the server reply as a file like object.
451 451 """
452 452 raise NotImplementedError()
453 453
454 454 def _callpush(self, cmd, fp, **args):
455 455 """execute a <cmd> on server
456 456
457 457 The command is expected to be related to a push. Push has a special
458 458 return method.
459 459
460 460 returns the server reply as a (ret, output) tuple. ret is either
461 461 empty (error) or a stringified int.
462 462 """
463 463 raise NotImplementedError()
464 464
465 465 def _calltwowaystream(self, cmd, fp, **args):
466 466 """execute <cmd> on server
467 467
468 468 The command will send a stream to the server and get a stream in reply.
469 469 """
470 470 raise NotImplementedError()
471 471
472 472 def _abort(self, exception):
473 473 """clearly abort the wire protocol connection and raise the exception
474 474 """
475 475 raise NotImplementedError()
476 476
477 477 # server side
478 478
479 479 # wire protocol command can either return a string or one of these classes.
480 480 class streamres(object):
481 481 """wireproto reply: binary stream
482 482
483 483 The call was successful and the result is a stream.
484 484
485 485 Accepts a generator containing chunks of data to be sent to the client.
486 486
487 487 ``prefer_uncompressed`` indicates that the data is expected to be
488 488 uncompressable and that the stream should therefore use the ``none``
489 489 engine.
490 490 """
491 491 def __init__(self, gen=None, prefer_uncompressed=False):
492 492 self.gen = gen
493 493 self.prefer_uncompressed = prefer_uncompressed
494 494
495 495 class streamres_legacy(object):
496 496 """wireproto reply: uncompressed binary stream
497 497
498 498 The call was successful and the result is a stream.
499 499
500 500 Accepts a generator containing chunks of data to be sent to the client.
501 501
502 502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
503 503 using the application/mercurial-0.1 media type.
504 504 """
505 505 def __init__(self, gen=None):
506 506 self.gen = gen
507 507
508 508 class pushres(object):
509 509 """wireproto reply: success with simple integer return
510 510
511 511 The call was successful and returned an integer contained in `self.res`.
512 512 """
513 513 def __init__(self, res, output):
514 514 self.res = res
515 515 self.output = output
516 516
517 517 class pusherr(object):
518 518 """wireproto reply: failure
519 519
520 520 The call failed. The `self.res` attribute contains the error message.
521 521 """
522 522 def __init__(self, res, output):
523 523 self.res = res
524 524 self.output = output
525 525
526 526 class ooberror(object):
527 527 """wireproto reply: failure of a batch of operation
528 528
529 529 Something failed during a batch call. The error message is stored in
530 530 `self.message`.
531 531 """
532 532 def __init__(self, message):
533 533 self.message = message
534 534
535 535 def getdispatchrepo(repo, proto, command):
536 536 """Obtain the repo used for processing wire protocol commands.
537 537
538 538 The intent of this function is to serve as a monkeypatch point for
539 539 extensions that need commands to operate on different repo views under
540 540 specialized circumstances.
541 541 """
542 542 return repo.filtered('served')
543 543
544 544 def dispatch(repo, proto, command):
545 545 repo = getdispatchrepo(repo, proto, command)
546 546 func, spec = commands[command]
547 547 args = proto.getargs(spec)
548 548 return func(repo, proto, *args)
549 549
550 550 def options(cmd, keys, others):
551 551 opts = {}
552 552 for k in keys:
553 553 if k in others:
554 554 opts[k] = others[k]
555 555 del others[k]
556 556 if others:
557 557 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
558 558 % (cmd, ",".join(others)))
559 559 return opts
560 560
561 561 def bundle1allowed(repo, action):
562 562 """Whether a bundle1 operation is allowed from the server.
563 563
564 564 Priority is:
565 565
566 566 1. server.bundle1gd.<action> (if generaldelta active)
567 567 2. server.bundle1.<action>
568 568 3. server.bundle1gd (if generaldelta active)
569 569 4. server.bundle1
570 570 """
571 571 ui = repo.ui
572 572 gd = 'generaldelta' in repo.requirements
573 573
574 574 if gd:
575 575 v = ui.configbool('server', 'bundle1gd.%s' % action)
576 576 if v is not None:
577 577 return v
578 578
579 579 v = ui.configbool('server', 'bundle1.%s' % action)
580 580 if v is not None:
581 581 return v
582 582
583 583 if gd:
584 584 v = ui.configbool('server', 'bundle1gd')
585 585 if v is not None:
586 586 return v
587 587
588 588 return ui.configbool('server', 'bundle1')
589 589
590 590 def supportedcompengines(ui, proto, role):
591 591 """Obtain the list of supported compression engines for a request."""
592 592 assert role in (util.CLIENTROLE, util.SERVERROLE)
593 593
594 594 compengines = util.compengines.supportedwireengines(role)
595 595
596 596 # Allow config to override default list and ordering.
597 597 if role == util.SERVERROLE:
598 598 configengines = ui.configlist('server', 'compressionengines')
599 599 config = 'server.compressionengines'
600 600 else:
601 601 # This is currently implemented mainly to facilitate testing. In most
602 602 # cases, the server should be in charge of choosing a compression engine
603 603 # because a server has the most to lose from a sub-optimal choice. (e.g.
604 604 # CPU DoS due to an expensive engine or a network DoS due to poor
605 605 # compression ratio).
606 606 configengines = ui.configlist('experimental',
607 607 'clientcompressionengines')
608 608 config = 'experimental.clientcompressionengines'
609 609
610 610 # No explicit config. Filter out the ones that aren't supposed to be
611 611 # advertised and return default ordering.
612 612 if not configengines:
613 613 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
614 614 return [e for e in compengines
615 615 if getattr(e.wireprotosupport(), attr) > 0]
616 616
617 617 # If compression engines are listed in the config, assume there is a good
618 618 # reason for it (like server operators wanting to achieve specific
619 619 # performance characteristics). So fail fast if the config references
620 620 # unusable compression engines.
621 621 validnames = set(e.name() for e in compengines)
622 622 invalidnames = set(e for e in configengines if e not in validnames)
623 623 if invalidnames:
624 624 raise error.Abort(_('invalid compression engine defined in %s: %s') %
625 625 (config, ', '.join(sorted(invalidnames))))
626 626
627 627 compengines = [e for e in compengines if e.name() in configengines]
628 628 compengines = sorted(compengines,
629 629 key=lambda e: configengines.index(e.name()))
630 630
631 631 if not compengines:
632 632 raise error.Abort(_('%s config option does not specify any known '
633 633 'compression engines') % config,
634 634 hint=_('usable compression engines: %s') %
635 635 ', '.sorted(validnames))
636 636
637 637 return compengines
638 638
639 639 class commandentry(object):
640 640 """Represents a declared wire protocol command."""
641 641 def __init__(self, func, args=''):
642 642 self.func = func
643 643 self.args = args
644 644
645 645 def _merge(self, func, args):
646 646 """Merge this instance with an incoming 2-tuple.
647 647
648 648 This is called when a caller using the old 2-tuple API attempts
649 649 to replace an instance. The incoming values are merged with
650 650 data not captured by the 2-tuple and a new instance containing
651 651 the union of the two objects is returned.
652 652 """
653 653 return commandentry(func, args)
654 654
655 655 # Old code treats instances as 2-tuples. So expose that interface.
656 656 def __iter__(self):
657 657 yield self.func
658 658 yield self.args
659 659
660 660 def __getitem__(self, i):
661 661 if i == 0:
662 662 return self.func
663 663 elif i == 1:
664 664 return self.args
665 665 else:
666 666 raise IndexError('can only access elements 0 and 1')
667 667
668 668 class commanddict(dict):
669 669 """Container for registered wire protocol commands.
670 670
671 671 It behaves like a dict. But __setitem__ is overwritten to allow silent
672 672 coercion of values from 2-tuples for API compatibility.
673 673 """
674 674 def __setitem__(self, k, v):
675 675 if isinstance(v, commandentry):
676 676 pass
677 677 # Cast 2-tuples to commandentry instances.
678 678 elif isinstance(v, tuple):
679 679 if len(v) != 2:
680 680 raise ValueError('command tuples must have exactly 2 elements')
681 681
682 682 # It is common for extensions to wrap wire protocol commands via
683 683 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
684 684 # doing this aren't aware of the new API that uses objects to store
685 685 # command entries, we automatically merge old state with new.
686 686 if k in self:
687 687 v = self[k]._merge(v[0], v[1])
688 688 else:
689 689 v = commandentry(v[0], v[1])
690 690 else:
691 691 raise ValueError('command entries must be commandentry instances '
692 692 'or 2-tuples')
693 693
694 694 return super(commanddict, self).__setitem__(k, v)
695 695
696 696 def commandavailable(self, command, proto):
697 697 """Determine if a command is available for the requested protocol."""
698 698 # For now, commands are available for all protocols. So do a simple
699 699 # membership test.
700 700 return command in self
701 701
702 702 commands = commanddict()
703 703
704 704 def wireprotocommand(name, args=''):
705 705 """Decorator to declare a wire protocol command.
706 706
707 707 ``name`` is the name of the wire protocol command being provided.
708 708
709 709 ``args`` is a space-delimited list of named arguments that the command
710 710 accepts. ``*`` is a special value that says to accept all arguments.
711 711 """
712 712 def register(func):
713 713 commands[name] = commandentry(func, args)
714 714 return func
715 715 return register
716 716
717 717 @wireprotocommand('batch', 'cmds *')
718 718 def batch(repo, proto, cmds, others):
719 719 repo = repo.filtered("served")
720 720 res = []
721 721 for pair in cmds.split(';'):
722 722 op, args = pair.split(' ', 1)
723 723 vals = {}
724 724 for a in args.split(','):
725 725 if a:
726 726 n, v = a.split('=')
727 727 vals[unescapearg(n)] = unescapearg(v)
728 728 func, spec = commands[op]
729 729 if spec:
730 730 keys = spec.split()
731 731 data = {}
732 732 for k in keys:
733 733 if k == '*':
734 734 star = {}
735 735 for key in vals.keys():
736 736 if key not in keys:
737 737 star[key] = vals[key]
738 738 data['*'] = star
739 739 else:
740 740 data[k] = vals[k]
741 741 result = func(repo, proto, *[data[k] for k in keys])
742 742 else:
743 743 result = func(repo, proto)
744 744 if isinstance(result, ooberror):
745 745 return result
746 746 res.append(escapearg(result))
747 747 return ';'.join(res)
748 748
749 749 @wireprotocommand('between', 'pairs')
750 750 def between(repo, proto, pairs):
751 751 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
752 752 r = []
753 753 for b in repo.between(pairs):
754 754 r.append(encodelist(b) + "\n")
755 755 return "".join(r)
756 756
757 757 @wireprotocommand('branchmap')
758 758 def branchmap(repo, proto):
759 759 branchmap = repo.branchmap()
760 760 heads = []
761 761 for branch, nodes in branchmap.iteritems():
762 762 branchname = urlreq.quote(encoding.fromlocal(branch))
763 763 branchnodes = encodelist(nodes)
764 764 heads.append('%s %s' % (branchname, branchnodes))
765 765 return '\n'.join(heads)
766 766
767 767 @wireprotocommand('branches', 'nodes')
768 768 def branches(repo, proto, nodes):
769 769 nodes = decodelist(nodes)
770 770 r = []
771 771 for b in repo.branches(nodes):
772 772 r.append(encodelist(b) + "\n")
773 773 return "".join(r)
774 774
775 775 @wireprotocommand('clonebundles', '')
776 776 def clonebundles(repo, proto):
777 777 """Server command for returning info for available bundles to seed clones.
778 778
779 779 Clients will parse this response and determine what bundle to fetch.
780 780
781 781 Extensions may wrap this command to filter or dynamically emit data
782 782 depending on the request. e.g. you could advertise URLs for the closest
783 783 data center given the client's IP address.
784 784 """
785 785 return repo.vfs.tryread('clonebundles.manifest')
786 786
787 787 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
788 788 'known', 'getbundle', 'unbundlehash', 'batch']
789 789
790 790 def _capabilities(repo, proto):
791 791 """return a list of capabilities for a repo
792 792
793 793 This function exists to allow extensions to easily wrap capabilities
794 794 computation
795 795
796 796 - returns a lists: easy to alter
797 797 - change done here will be propagated to both `capabilities` and `hello`
798 798 command without any other action needed.
799 799 """
800 800 # copy to prevent modification of the global list
801 801 caps = list(wireprotocaps)
802 802 if streamclone.allowservergeneration(repo):
803 803 if repo.ui.configbool('server', 'preferuncompressed'):
804 804 caps.append('stream-preferred')
805 805 requiredformats = repo.requirements & repo.supportedformats
806 806 # if our local revlogs are just revlogv1, add 'stream' cap
807 807 if not requiredformats - {'revlogv1'}:
808 808 caps.append('stream')
809 809 # otherwise, add 'streamreqs' detailing our local revlog format
810 810 else:
811 811 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
812 812 if repo.ui.configbool('experimental', 'bundle2-advertise'):
813 813 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
814 814 caps.append('bundle2=' + urlreq.quote(capsblob))
815 815 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
816 816
817 817 if proto.name == 'http':
818 818 caps.append('httpheader=%d' %
819 819 repo.ui.configint('server', 'maxhttpheaderlen'))
820 820 if repo.ui.configbool('experimental', 'httppostargs'):
821 821 caps.append('httppostargs')
822 822
823 823 # FUTURE advertise 0.2rx once support is implemented
824 824 # FUTURE advertise minrx and mintx after consulting config option
825 825 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
826 826
827 827 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
828 828 if compengines:
829 829 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
830 830 for e in compengines)
831 831 caps.append('compression=%s' % comptypes)
832 832
833 833 return caps
834 834
835 835 # If you are writing an extension and consider wrapping this function. Wrap
836 836 # `_capabilities` instead.
837 837 @wireprotocommand('capabilities')
838 838 def capabilities(repo, proto):
839 839 return ' '.join(_capabilities(repo, proto))
840 840
841 841 @wireprotocommand('changegroup', 'roots')
842 842 def changegroup(repo, proto, roots):
843 843 nodes = decodelist(roots)
844 844 outgoing = discovery.outgoing(repo, missingroots=nodes,
845 845 missingheads=repo.heads())
846 846 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
847 847 gen = iter(lambda: cg.read(32768), '')
848 848 return streamres(gen=gen)
849 849
850 850 @wireprotocommand('changegroupsubset', 'bases heads')
851 851 def changegroupsubset(repo, proto, bases, heads):
852 852 bases = decodelist(bases)
853 853 heads = decodelist(heads)
854 854 outgoing = discovery.outgoing(repo, missingroots=bases,
855 855 missingheads=heads)
856 856 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
857 857 gen = iter(lambda: cg.read(32768), '')
858 858 return streamres(gen=gen)
859 859
860 860 @wireprotocommand('debugwireargs', 'one two *')
861 861 def debugwireargs(repo, proto, one, two, others):
862 862 # only accept optional args from the known set
863 863 opts = options('debugwireargs', ['three', 'four'], others)
864 864 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
865 865
866 866 @wireprotocommand('getbundle', '*')
867 867 def getbundle(repo, proto, others):
868 868 opts = options('getbundle', gboptsmap.keys(), others)
869 869 for k, v in opts.iteritems():
870 870 keytype = gboptsmap[k]
871 871 if keytype == 'nodes':
872 872 opts[k] = decodelist(v)
873 873 elif keytype == 'csv':
874 874 opts[k] = list(v.split(','))
875 875 elif keytype == 'scsv':
876 876 opts[k] = set(v.split(','))
877 877 elif keytype == 'boolean':
878 878 # Client should serialize False as '0', which is a non-empty string
879 879 # so it evaluates as a True bool.
880 880 if v == '0':
881 881 opts[k] = False
882 882 else:
883 883 opts[k] = bool(v)
884 884 elif keytype != 'plain':
885 885 raise KeyError('unknown getbundle option type %s'
886 886 % keytype)
887 887
888 888 if not bundle1allowed(repo, 'pull'):
889 889 if not exchange.bundle2requested(opts.get('bundlecaps')):
890 890 if proto.name == 'http':
891 891 return ooberror(bundle2required)
892 892 raise error.Abort(bundle2requiredmain,
893 893 hint=bundle2requiredhint)
894 894
895 895 prefercompressed = True
896 896
897 897 try:
898 898 if repo.ui.configbool('server', 'disablefullbundle'):
899 899 # Check to see if this is a full clone.
900 900 clheads = set(repo.changelog.heads())
901 901 changegroup = opts.get('cg', True)
902 902 heads = set(opts.get('heads', set()))
903 903 common = set(opts.get('common', set()))
904 904 common.discard(nullid)
905 905 if changegroup and not common and clheads == heads:
906 906 raise error.Abort(
907 907 _('server has pull-based clones disabled'),
908 908 hint=_('remove --pull if specified or upgrade Mercurial'))
909 909
910 910 info, chunks = exchange.getbundlechunks(repo, 'serve',
911 911 **pycompat.strkwargs(opts))
912 912 prefercompressed = info.get('prefercompressed', True)
913 913 except error.Abort as exc:
914 914 # cleanly forward Abort error to the client
915 915 if not exchange.bundle2requested(opts.get('bundlecaps')):
916 916 if proto.name == 'http':
917 917 return ooberror(str(exc) + '\n')
918 918 raise # cannot do better for bundle1 + ssh
919 919 # bundle2 request expect a bundle2 reply
920 920 bundler = bundle2.bundle20(repo.ui)
921 921 manargs = [('message', str(exc))]
922 922 advargs = []
923 923 if exc.hint is not None:
924 924 advargs.append(('hint', exc.hint))
925 925 bundler.addpart(bundle2.bundlepart('error:abort',
926 926 manargs, advargs))
927 927 chunks = bundler.getchunks()
928 928 prefercompressed = False
929 929
930 930 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
931 931
932 932 @wireprotocommand('heads')
933 933 def heads(repo, proto):
934 934 h = repo.heads()
935 935 return encodelist(h) + "\n"
936 936
937 937 @wireprotocommand('hello')
938 938 def hello(repo, proto):
939 939 '''the hello command returns a set of lines describing various
940 940 interesting things about the server, in an RFC822-like format.
941 941 Currently the only one defined is "capabilities", which
942 942 consists of a line in the form:
943 943
944 944 capabilities: space separated list of tokens
945 945 '''
946 946 return "capabilities: %s\n" % (capabilities(repo, proto))
947 947
948 948 @wireprotocommand('listkeys', 'namespace')
949 949 def listkeys(repo, proto, namespace):
950 950 d = repo.listkeys(encoding.tolocal(namespace)).items()
951 951 return pushkeymod.encodekeys(d)
952 952
953 953 @wireprotocommand('lookup', 'key')
954 954 def lookup(repo, proto, key):
955 955 try:
956 956 k = encoding.tolocal(key)
957 957 c = repo[k]
958 958 r = c.hex()
959 959 success = 1
960 960 except Exception as inst:
961 961 r = str(inst)
962 962 success = 0
963 963 return "%d %s\n" % (success, r)
964 964
965 965 @wireprotocommand('known', 'nodes *')
966 966 def known(repo, proto, nodes, others):
967 967 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
968 968
969 969 @wireprotocommand('pushkey', 'namespace key old new')
970 970 def pushkey(repo, proto, namespace, key, old, new):
971 971 # compatibility with pre-1.8 clients which were accidentally
972 972 # sending raw binary nodes rather than utf-8-encoded hex
973 973 if len(new) == 20 and util.escapestr(new) != new:
974 974 # looks like it could be a binary node
975 975 try:
976 976 new.decode('utf-8')
977 977 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
978 978 except UnicodeDecodeError:
979 979 pass # binary, leave unmodified
980 980 else:
981 981 new = encoding.tolocal(new) # normal path
982 982
983 983 with proto.mayberedirectstdio() as output:
984 984 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
985 985 encoding.tolocal(old), new) or False
986 986
987 987 output = output.getvalue() if output else ''
988 988 return '%s\n%s' % (int(r), output)
989 989
990 990 @wireprotocommand('stream_out')
991 991 def stream(repo, proto):
992 992 '''If the server supports streaming clone, it advertises the "stream"
993 993 capability with a value representing the version and flags of the repo
994 994 it is serving. Client checks to see if it understands the format.
995 995 '''
996 996 return streamres_legacy(streamclone.generatev1wireproto(repo))
997 997
998 998 @wireprotocommand('unbundle', 'heads')
999 999 def unbundle(repo, proto, heads):
1000 1000 their_heads = decodelist(heads)
1001 1001
1002 1002 with proto.mayberedirectstdio() as output:
1003 1003 try:
1004 1004 exchange.check_heads(repo, their_heads, 'preparing changes')
1005 1005
1006 1006 # write bundle data to temporary file because it can be big
1007 1007 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1008 1008 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1009 1009 r = 0
1010 1010 try:
1011 proto.getfile(fp)
1011 proto.forwardpayload(fp)
1012 1012 fp.seek(0)
1013 1013 gen = exchange.readbundle(repo.ui, fp, None)
1014 1014 if (isinstance(gen, changegroupmod.cg1unpacker)
1015 1015 and not bundle1allowed(repo, 'push')):
1016 1016 if proto.name == 'http':
1017 1017 # need to special case http because stderr do not get to
1018 1018 # the http client on failed push so we need to abuse
1019 1019 # some other error type to make sure the message get to
1020 1020 # the user.
1021 1021 return ooberror(bundle2required)
1022 1022 raise error.Abort(bundle2requiredmain,
1023 1023 hint=bundle2requiredhint)
1024 1024
1025 1025 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1026 1026 proto.client())
1027 1027 if util.safehasattr(r, 'addpart'):
1028 1028 # The return looks streamable, we are in the bundle2 case
1029 1029 # and should return a stream.
1030 1030 return streamres_legacy(gen=r.getchunks())
1031 1031 return pushres(r, output.getvalue() if output else '')
1032 1032
1033 1033 finally:
1034 1034 fp.close()
1035 1035 os.unlink(tempname)
1036 1036
1037 1037 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1038 1038 # handle non-bundle2 case first
1039 1039 if not getattr(exc, 'duringunbundle2', False):
1040 1040 try:
1041 1041 raise
1042 1042 except error.Abort:
1043 1043 # The old code we moved used util.stderr directly.
1044 1044 # We did not change it to minimise code change.
1045 1045 # This need to be moved to something proper.
1046 1046 # Feel free to do it.
1047 1047 util.stderr.write("abort: %s\n" % exc)
1048 1048 if exc.hint is not None:
1049 1049 util.stderr.write("(%s)\n" % exc.hint)
1050 1050 return pushres(0, output.getvalue() if output else '')
1051 1051 except error.PushRaced:
1052 1052 return pusherr(str(exc),
1053 1053 output.getvalue() if output else '')
1054 1054
1055 1055 bundler = bundle2.bundle20(repo.ui)
1056 1056 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1057 1057 bundler.addpart(out)
1058 1058 try:
1059 1059 try:
1060 1060 raise
1061 1061 except error.PushkeyFailed as exc:
1062 1062 # check client caps
1063 1063 remotecaps = getattr(exc, '_replycaps', None)
1064 1064 if (remotecaps is not None
1065 1065 and 'pushkey' not in remotecaps.get('error', ())):
1066 1066 # no support remote side, fallback to Abort handler.
1067 1067 raise
1068 1068 part = bundler.newpart('error:pushkey')
1069 1069 part.addparam('in-reply-to', exc.partid)
1070 1070 if exc.namespace is not None:
1071 1071 part.addparam('namespace', exc.namespace,
1072 1072 mandatory=False)
1073 1073 if exc.key is not None:
1074 1074 part.addparam('key', exc.key, mandatory=False)
1075 1075 if exc.new is not None:
1076 1076 part.addparam('new', exc.new, mandatory=False)
1077 1077 if exc.old is not None:
1078 1078 part.addparam('old', exc.old, mandatory=False)
1079 1079 if exc.ret is not None:
1080 1080 part.addparam('ret', exc.ret, mandatory=False)
1081 1081 except error.BundleValueError as exc:
1082 1082 errpart = bundler.newpart('error:unsupportedcontent')
1083 1083 if exc.parttype is not None:
1084 1084 errpart.addparam('parttype', exc.parttype)
1085 1085 if exc.params:
1086 1086 errpart.addparam('params', '\0'.join(exc.params))
1087 1087 except error.Abort as exc:
1088 1088 manargs = [('message', str(exc))]
1089 1089 advargs = []
1090 1090 if exc.hint is not None:
1091 1091 advargs.append(('hint', exc.hint))
1092 1092 bundler.addpart(bundle2.bundlepart('error:abort',
1093 1093 manargs, advargs))
1094 1094 except error.PushRaced as exc:
1095 1095 bundler.newpart('error:pushraced', [('message', str(exc))])
1096 1096 return streamres_legacy(gen=bundler.getchunks())
@@ -1,454 +1,455 b''
1 1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 3 #
4 4 # This software may be used and distributed according to the terms of the
5 5 # GNU General Public License version 2 or any later version.
6 6
7 7 from __future__ import absolute_import
8 8
9 9 import abc
10 10 import cgi
11 11 import contextlib
12 12 import struct
13 13 import sys
14 14
15 15 from .i18n import _
16 16 from . import (
17 17 encoding,
18 18 error,
19 19 hook,
20 20 pycompat,
21 21 util,
22 22 wireproto,
23 23 )
24 24
25 25 stringio = util.stringio
26 26
27 27 urlerr = util.urlerr
28 28 urlreq = util.urlreq
29 29
30 30 HTTP_OK = 200
31 31
32 32 HGTYPE = 'application/mercurial-0.1'
33 33 HGTYPE2 = 'application/mercurial-0.2'
34 34 HGERRTYPE = 'application/hg-error'
35 35
36 36 # Names of the SSH protocol implementations.
37 37 SSHV1 = 'ssh-v1'
38 38 # This is advertised over the wire. Incremental the counter at the end
39 39 # to reflect BC breakages.
40 40 SSHV2 = 'exp-ssh-v2-0001'
41 41
42 42 class baseprotocolhandler(object):
43 43 """Abstract base class for wire protocol handlers.
44 44
45 45 A wire protocol handler serves as an interface between protocol command
46 46 handlers and the wire protocol transport layer. Protocol handlers provide
47 47 methods to read command arguments, redirect stdio for the duration of
48 48 the request, handle response types, etc.
49 49 """
50 50
51 51 __metaclass__ = abc.ABCMeta
52 52
53 53 @abc.abstractproperty
54 54 def name(self):
55 55 """The name of the protocol implementation.
56 56
57 57 Used for uniquely identifying the transport type.
58 58 """
59 59
60 60 @abc.abstractmethod
61 61 def getargs(self, args):
62 62 """return the value for arguments in <args>
63 63
64 64 returns a list of values (same order as <args>)"""
65 65
66 66 @abc.abstractmethod
67 def getfile(self, fp):
68 """write the whole content of a file into a file like object
67 def forwardpayload(self, fp):
68 """Read the raw payload and forward to a file.
69 69
70 The file is in the form::
71
72 (<chunk-size>\n<chunk>)+0\n
73
74 chunk size is the ascii version of the int.
70 The payload is read in full before the function returns.
75 71 """
76 72
77 73 @abc.abstractmethod
78 74 def mayberedirectstdio(self):
79 75 """Context manager to possibly redirect stdio.
80 76
81 77 The context manager yields a file-object like object that receives
82 78 stdout and stderr output when the context manager is active. Or it
83 79 yields ``None`` if no I/O redirection occurs.
84 80
85 81 The intent of this context manager is to capture stdio output
86 82 so it may be sent in the response. Some transports support streaming
87 83 stdio to the client in real time. For these transports, stdio output
88 84 won't be captured.
89 85 """
90 86
91 87 @abc.abstractmethod
92 88 def client(self):
93 89 """Returns a string representation of this client (as bytes)."""
94 90
95 91 def decodevaluefromheaders(req, headerprefix):
96 92 """Decode a long value from multiple HTTP request headers.
97 93
98 94 Returns the value as a bytes, not a str.
99 95 """
100 96 chunks = []
101 97 i = 1
102 98 prefix = headerprefix.upper().replace(r'-', r'_')
103 99 while True:
104 100 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
105 101 if v is None:
106 102 break
107 103 chunks.append(pycompat.bytesurl(v))
108 104 i += 1
109 105
110 106 return ''.join(chunks)
111 107
112 108 class webproto(baseprotocolhandler):
113 109 def __init__(self, req, ui):
114 110 self._req = req
115 111 self._ui = ui
116 112
117 113 @property
118 114 def name(self):
119 115 return 'http'
120 116
121 117 def getargs(self, args):
122 118 knownargs = self._args()
123 119 data = {}
124 120 keys = args.split()
125 121 for k in keys:
126 122 if k == '*':
127 123 star = {}
128 124 for key in knownargs.keys():
129 125 if key != 'cmd' and key not in keys:
130 126 star[key] = knownargs[key][0]
131 127 data['*'] = star
132 128 else:
133 129 data[k] = knownargs[k][0]
134 130 return [data[k] for k in keys]
135 131
136 132 def _args(self):
137 133 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
138 134 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
139 135 if postlen:
140 136 args.update(cgi.parse_qs(
141 137 self._req.read(postlen), keep_blank_values=True))
142 138 return args
143 139
144 140 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
145 141 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
146 142 return args
147 143
148 def getfile(self, fp):
144 def forwardpayload(self, fp):
149 145 length = int(self._req.env[r'CONTENT_LENGTH'])
150 146 # If httppostargs is used, we need to read Content-Length
151 147 # minus the amount that was consumed by args.
152 148 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
153 149 for s in util.filechunkiter(self._req, limit=length):
154 150 fp.write(s)
155 151
156 152 @contextlib.contextmanager
157 153 def mayberedirectstdio(self):
158 154 oldout = self._ui.fout
159 155 olderr = self._ui.ferr
160 156
161 157 out = util.stringio()
162 158
163 159 try:
164 160 self._ui.fout = out
165 161 self._ui.ferr = out
166 162 yield out
167 163 finally:
168 164 self._ui.fout = oldout
169 165 self._ui.ferr = olderr
170 166
171 167 def client(self):
172 168 return 'remote:%s:%s:%s' % (
173 169 self._req.env.get('wsgi.url_scheme') or 'http',
174 170 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
175 171 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
176 172
177 173 def responsetype(self, prefer_uncompressed):
178 174 """Determine the appropriate response type and compression settings.
179 175
180 176 Returns a tuple of (mediatype, compengine, engineopts).
181 177 """
182 178 # Determine the response media type and compression engine based
183 179 # on the request parameters.
184 180 protocaps = decodevaluefromheaders(self._req, r'X-HgProto').split(' ')
185 181
186 182 if '0.2' in protocaps:
187 183 # All clients are expected to support uncompressed data.
188 184 if prefer_uncompressed:
189 185 return HGTYPE2, util._noopengine(), {}
190 186
191 187 # Default as defined by wire protocol spec.
192 188 compformats = ['zlib', 'none']
193 189 for cap in protocaps:
194 190 if cap.startswith('comp='):
195 191 compformats = cap[5:].split(',')
196 192 break
197 193
198 194 # Now find an agreed upon compression format.
199 195 for engine in wireproto.supportedcompengines(self._ui, self,
200 196 util.SERVERROLE):
201 197 if engine.wireprotosupport().name in compformats:
202 198 opts = {}
203 199 level = self._ui.configint('server',
204 200 '%slevel' % engine.name())
205 201 if level is not None:
206 202 opts['level'] = level
207 203
208 204 return HGTYPE2, engine, opts
209 205
210 206 # No mutually supported compression format. Fall back to the
211 207 # legacy protocol.
212 208
213 209 # Don't allow untrusted settings because disabling compression or
214 210 # setting a very high compression level could lead to flooding
215 211 # the server's network or CPU.
216 212 opts = {'level': self._ui.configint('server', 'zliblevel')}
217 213 return HGTYPE, util.compengines['zlib'], opts
218 214
219 215 def iscmd(cmd):
220 216 return cmd in wireproto.commands
221 217
222 218 def parsehttprequest(repo, req, query):
223 219 """Parse the HTTP request for a wire protocol request.
224 220
225 221 If the current request appears to be a wire protocol request, this
226 222 function returns a dict with details about that request, including
227 223 an ``abstractprotocolserver`` instance suitable for handling the
228 224 request. Otherwise, ``None`` is returned.
229 225
230 226 ``req`` is a ``wsgirequest`` instance.
231 227 """
232 228 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
233 229 # string parameter. If it isn't present, this isn't a wire protocol
234 230 # request.
235 231 if r'cmd' not in req.form:
236 232 return None
237 233
238 234 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
239 235
240 236 # The "cmd" request parameter is used by both the wire protocol and hgweb.
241 237 # While not all wire protocol commands are available for all transports,
242 238 # if we see a "cmd" value that resembles a known wire protocol command, we
243 239 # route it to a protocol handler. This is better than routing possible
244 240 # wire protocol requests to hgweb because it prevents hgweb from using
245 241 # known wire protocol commands and it is less confusing for machine
246 242 # clients.
247 243 if cmd not in wireproto.commands:
248 244 return None
249 245
250 246 proto = webproto(req, repo.ui)
251 247
252 248 return {
253 249 'cmd': cmd,
254 250 'proto': proto,
255 251 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
256 252 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
257 253 }
258 254
259 255 def _callhttp(repo, req, proto, cmd):
260 256 def genversion2(gen, engine, engineopts):
261 257 # application/mercurial-0.2 always sends a payload header
262 258 # identifying the compression engine.
263 259 name = engine.wireprotosupport().name
264 260 assert 0 < len(name) < 256
265 261 yield struct.pack('B', len(name))
266 262 yield name
267 263
268 264 for chunk in gen:
269 265 yield chunk
270 266
271 267 rsp = wireproto.dispatch(repo, proto, cmd)
272 268
273 269 if not wireproto.commands.commandavailable(cmd, proto):
274 270 req.respond(HTTP_OK, HGERRTYPE,
275 271 body=_('requested wire protocol command is not available '
276 272 'over HTTP'))
277 273 return []
278 274
279 275 if isinstance(rsp, bytes):
280 276 req.respond(HTTP_OK, HGTYPE, body=rsp)
281 277 return []
282 278 elif isinstance(rsp, wireproto.streamres_legacy):
283 279 gen = rsp.gen
284 280 req.respond(HTTP_OK, HGTYPE)
285 281 return gen
286 282 elif isinstance(rsp, wireproto.streamres):
287 283 gen = rsp.gen
288 284
289 285 # This code for compression should not be streamres specific. It
290 286 # is here because we only compress streamres at the moment.
291 287 mediatype, engine, engineopts = proto.responsetype(
292 288 rsp.prefer_uncompressed)
293 289 gen = engine.compressstream(gen, engineopts)
294 290
295 291 if mediatype == HGTYPE2:
296 292 gen = genversion2(gen, engine, engineopts)
297 293
298 294 req.respond(HTTP_OK, mediatype)
299 295 return gen
300 296 elif isinstance(rsp, wireproto.pushres):
301 297 rsp = '%d\n%s' % (rsp.res, rsp.output)
302 298 req.respond(HTTP_OK, HGTYPE, body=rsp)
303 299 return []
304 300 elif isinstance(rsp, wireproto.pusherr):
305 301 # This is the httplib workaround documented in _handlehttperror().
306 302 req.drain()
307 303
308 304 rsp = '0\n%s\n' % rsp.res
309 305 req.respond(HTTP_OK, HGTYPE, body=rsp)
310 306 return []
311 307 elif isinstance(rsp, wireproto.ooberror):
312 308 rsp = rsp.message
313 309 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
314 310 return []
315 311 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
316 312
317 313 def _handlehttperror(e, req, cmd):
318 314 """Called when an ErrorResponse is raised during HTTP request processing."""
319 315
320 316 # Clients using Python's httplib are stateful: the HTTP client
321 317 # won't process an HTTP response until all request data is
322 318 # sent to the server. The intent of this code is to ensure
323 319 # we always read HTTP request data from the client, thus
324 320 # ensuring httplib transitions to a state that allows it to read
325 321 # the HTTP response. In other words, it helps prevent deadlocks
326 322 # on clients using httplib.
327 323
328 324 if (req.env[r'REQUEST_METHOD'] == r'POST' and
329 325 # But not if Expect: 100-continue is being used.
330 326 (req.env.get('HTTP_EXPECT',
331 327 '').lower() != '100-continue') or
332 328 # Or the non-httplib HTTP library is being advertised by
333 329 # the client.
334 330 req.env.get('X-HgHttp2', '')):
335 331 req.drain()
336 332 else:
337 333 req.headers.append((r'Connection', r'Close'))
338 334
339 335 # TODO This response body assumes the failed command was
340 336 # "unbundle." That assumption is not always valid.
341 337 req.respond(e, HGTYPE, body='0\n%s\n' % e)
342 338
343 339 return ''
344 340
345 341 def _sshv1respondbytes(fout, value):
346 342 """Send a bytes response for protocol version 1."""
347 343 fout.write('%d\n' % len(value))
348 344 fout.write(value)
349 345 fout.flush()
350 346
351 347 def _sshv1respondstream(fout, source):
352 348 write = fout.write
353 349 for chunk in source.gen:
354 350 write(chunk)
355 351 fout.flush()
356 352
357 353 def _sshv1respondooberror(fout, ferr, rsp):
358 354 ferr.write(b'%s\n-\n' % rsp)
359 355 ferr.flush()
360 356 fout.write(b'\n')
361 357 fout.flush()
362 358
363 359 class sshv1protocolhandler(baseprotocolhandler):
364 360 """Handler for requests services via version 1 of SSH protocol."""
365 361 def __init__(self, ui, fin, fout):
366 362 self._ui = ui
367 363 self._fin = fin
368 364 self._fout = fout
369 365
370 366 @property
371 367 def name(self):
372 368 return 'ssh'
373 369
374 370 def getargs(self, args):
375 371 data = {}
376 372 keys = args.split()
377 373 for n in xrange(len(keys)):
378 374 argline = self._fin.readline()[:-1]
379 375 arg, l = argline.split()
380 376 if arg not in keys:
381 377 raise error.Abort(_("unexpected parameter %r") % arg)
382 378 if arg == '*':
383 379 star = {}
384 380 for k in xrange(int(l)):
385 381 argline = self._fin.readline()[:-1]
386 382 arg, l = argline.split()
387 383 val = self._fin.read(int(l))
388 384 star[arg] = val
389 385 data['*'] = star
390 386 else:
391 387 val = self._fin.read(int(l))
392 388 data[arg] = val
393 389 return [data[k] for k in keys]
394 390
395 def getfile(self, fpout):
391 def forwardpayload(self, fpout):
392 # The file is in the form:
393 #
394 # <chunk size>\n<chunk>
395 # ...
396 # 0\n
396 397 _sshv1respondbytes(self._fout, b'')
397 398 count = int(self._fin.readline())
398 399 while count:
399 400 fpout.write(self._fin.read(count))
400 401 count = int(self._fin.readline())
401 402
402 403 @contextlib.contextmanager
403 404 def mayberedirectstdio(self):
404 405 yield None
405 406
406 407 def client(self):
407 408 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
408 409 return 'remote:ssh:' + client
409 410
410 411 class sshserver(object):
411 412 def __init__(self, ui, repo):
412 413 self._ui = ui
413 414 self._repo = repo
414 415 self._fin = ui.fin
415 416 self._fout = ui.fout
416 417
417 418 hook.redirect(True)
418 419 ui.fout = repo.ui.fout = ui.ferr
419 420
420 421 # Prevent insertion/deletion of CRs
421 422 util.setbinary(self._fin)
422 423 util.setbinary(self._fout)
423 424
424 425 self._proto = sshv1protocolhandler(self._ui, self._fin, self._fout)
425 426
426 427 def serve_forever(self):
427 428 while self.serve_one():
428 429 pass
429 430 sys.exit(0)
430 431
431 432 def serve_one(self):
432 433 cmd = self._fin.readline()[:-1]
433 434 if cmd and wireproto.commands.commandavailable(cmd, self._proto):
434 435 rsp = wireproto.dispatch(self._repo, self._proto, cmd)
435 436
436 437 if isinstance(rsp, bytes):
437 438 _sshv1respondbytes(self._fout, rsp)
438 439 elif isinstance(rsp, wireproto.streamres):
439 440 _sshv1respondstream(self._fout, rsp)
440 441 elif isinstance(rsp, wireproto.streamres_legacy):
441 442 _sshv1respondstream(self._fout, rsp)
442 443 elif isinstance(rsp, wireproto.pushres):
443 444 _sshv1respondbytes(self._fout, b'')
444 445 _sshv1respondbytes(self._fout, bytes(rsp.res))
445 446 elif isinstance(rsp, wireproto.pusherr):
446 447 _sshv1respondbytes(self._fout, rsp.res)
447 448 elif isinstance(rsp, wireproto.ooberror):
448 449 _sshv1respondooberror(self._fout, self._ui.ferr, rsp.message)
449 450 else:
450 451 raise error.ProgrammingError('unhandled response type from '
451 452 'wire protocol command: %s' % rsp)
452 453 elif cmd:
453 454 _sshv1respondbytes(self._fout, b'')
454 455 return cmd != ''
General Comments 0
You need to be logged in to leave comments. Login now