##// END OF EJS Templates
wireproto: add transport specific capabilities in the transport...
Gregory Szorc -
r36631:6e585bca default
parent child Browse files
Show More
@@ -1,1109 +1,1093 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 wireprototypes,
34 wireprototypes,
35 )
35 )
36
36
37 urlerr = util.urlerr
37 urlerr = util.urlerr
38 urlreq = util.urlreq
38 urlreq = util.urlreq
39
39
40 bytesresponse = wireprototypes.bytesresponse
40 bytesresponse = wireprototypes.bytesresponse
41 ooberror = wireprototypes.ooberror
41 ooberror = wireprototypes.ooberror
42 pushres = wireprototypes.pushres
42 pushres = wireprototypes.pushres
43 pusherr = wireprototypes.pusherr
43 pusherr = wireprototypes.pusherr
44 streamres = wireprototypes.streamres
44 streamres = wireprototypes.streamres
45 streamres_legacy = wireprototypes.streamreslegacy
45 streamres_legacy = wireprototypes.streamreslegacy
46
46
47 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
47 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
48 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
48 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
49 'IncompatibleClient')
49 'IncompatibleClient')
50 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
50 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
51
51
52 class remoteiterbatcher(peer.iterbatcher):
52 class remoteiterbatcher(peer.iterbatcher):
53 def __init__(self, remote):
53 def __init__(self, remote):
54 super(remoteiterbatcher, self).__init__()
54 super(remoteiterbatcher, self).__init__()
55 self._remote = remote
55 self._remote = remote
56
56
57 def __getattr__(self, name):
57 def __getattr__(self, name):
58 # Validate this method is batchable, since submit() only supports
58 # Validate this method is batchable, since submit() only supports
59 # batchable methods.
59 # batchable methods.
60 fn = getattr(self._remote, name)
60 fn = getattr(self._remote, name)
61 if not getattr(fn, 'batchable', None):
61 if not getattr(fn, 'batchable', None):
62 raise error.ProgrammingError('Attempted to batch a non-batchable '
62 raise error.ProgrammingError('Attempted to batch a non-batchable '
63 'call to %r' % name)
63 'call to %r' % name)
64
64
65 return super(remoteiterbatcher, self).__getattr__(name)
65 return super(remoteiterbatcher, self).__getattr__(name)
66
66
67 def submit(self):
67 def submit(self):
68 """Break the batch request into many patch calls and pipeline them.
68 """Break the batch request into many patch calls and pipeline them.
69
69
70 This is mostly valuable over http where request sizes can be
70 This is mostly valuable over http where request sizes can be
71 limited, but can be used in other places as well.
71 limited, but can be used in other places as well.
72 """
72 """
73 # 2-tuple of (command, arguments) that represents what will be
73 # 2-tuple of (command, arguments) that represents what will be
74 # sent over the wire.
74 # sent over the wire.
75 requests = []
75 requests = []
76
76
77 # 4-tuple of (command, final future, @batchable generator, remote
77 # 4-tuple of (command, final future, @batchable generator, remote
78 # future).
78 # future).
79 results = []
79 results = []
80
80
81 for command, args, opts, finalfuture in self.calls:
81 for command, args, opts, finalfuture in self.calls:
82 mtd = getattr(self._remote, command)
82 mtd = getattr(self._remote, command)
83 batchable = mtd.batchable(mtd.__self__, *args, **opts)
83 batchable = mtd.batchable(mtd.__self__, *args, **opts)
84
84
85 commandargs, fremote = next(batchable)
85 commandargs, fremote = next(batchable)
86 assert fremote
86 assert fremote
87 requests.append((command, commandargs))
87 requests.append((command, commandargs))
88 results.append((command, finalfuture, batchable, fremote))
88 results.append((command, finalfuture, batchable, fremote))
89
89
90 if requests:
90 if requests:
91 self._resultiter = self._remote._submitbatch(requests)
91 self._resultiter = self._remote._submitbatch(requests)
92
92
93 self._results = results
93 self._results = results
94
94
95 def results(self):
95 def results(self):
96 for command, finalfuture, batchable, remotefuture in self._results:
96 for command, finalfuture, batchable, remotefuture in self._results:
97 # Get the raw result, set it in the remote future, feed it
97 # Get the raw result, set it in the remote future, feed it
98 # back into the @batchable generator so it can be decoded, and
98 # back into the @batchable generator so it can be decoded, and
99 # set the result on the final future to this value.
99 # set the result on the final future to this value.
100 remoteresult = next(self._resultiter)
100 remoteresult = next(self._resultiter)
101 remotefuture.set(remoteresult)
101 remotefuture.set(remoteresult)
102 finalfuture.set(next(batchable))
102 finalfuture.set(next(batchable))
103
103
104 # Verify our @batchable generators only emit 2 values.
104 # Verify our @batchable generators only emit 2 values.
105 try:
105 try:
106 next(batchable)
106 next(batchable)
107 except StopIteration:
107 except StopIteration:
108 pass
108 pass
109 else:
109 else:
110 raise error.ProgrammingError('%s @batchable generator emitted '
110 raise error.ProgrammingError('%s @batchable generator emitted '
111 'unexpected value count' % command)
111 'unexpected value count' % command)
112
112
113 yield finalfuture.value
113 yield finalfuture.value
114
114
115 # Forward a couple of names from peer to make wireproto interactions
115 # Forward a couple of names from peer to make wireproto interactions
116 # slightly more sensible.
116 # slightly more sensible.
117 batchable = peer.batchable
117 batchable = peer.batchable
118 future = peer.future
118 future = peer.future
119
119
120 # list of nodes encoding / decoding
120 # list of nodes encoding / decoding
121
121
122 def decodelist(l, sep=' '):
122 def decodelist(l, sep=' '):
123 if l:
123 if l:
124 return [bin(v) for v in l.split(sep)]
124 return [bin(v) for v in l.split(sep)]
125 return []
125 return []
126
126
127 def encodelist(l, sep=' '):
127 def encodelist(l, sep=' '):
128 try:
128 try:
129 return sep.join(map(hex, l))
129 return sep.join(map(hex, l))
130 except TypeError:
130 except TypeError:
131 raise
131 raise
132
132
133 # batched call argument encoding
133 # batched call argument encoding
134
134
135 def escapearg(plain):
135 def escapearg(plain):
136 return (plain
136 return (plain
137 .replace(':', ':c')
137 .replace(':', ':c')
138 .replace(',', ':o')
138 .replace(',', ':o')
139 .replace(';', ':s')
139 .replace(';', ':s')
140 .replace('=', ':e'))
140 .replace('=', ':e'))
141
141
142 def unescapearg(escaped):
142 def unescapearg(escaped):
143 return (escaped
143 return (escaped
144 .replace(':e', '=')
144 .replace(':e', '=')
145 .replace(':s', ';')
145 .replace(':s', ';')
146 .replace(':o', ',')
146 .replace(':o', ',')
147 .replace(':c', ':'))
147 .replace(':c', ':'))
148
148
149 def encodebatchcmds(req):
149 def encodebatchcmds(req):
150 """Return a ``cmds`` argument value for the ``batch`` command."""
150 """Return a ``cmds`` argument value for the ``batch`` command."""
151 cmds = []
151 cmds = []
152 for op, argsdict in req:
152 for op, argsdict in req:
153 # Old servers didn't properly unescape argument names. So prevent
153 # Old servers didn't properly unescape argument names. So prevent
154 # the sending of argument names that may not be decoded properly by
154 # the sending of argument names that may not be decoded properly by
155 # servers.
155 # servers.
156 assert all(escapearg(k) == k for k in argsdict)
156 assert all(escapearg(k) == k for k in argsdict)
157
157
158 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
158 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
159 for k, v in argsdict.iteritems())
159 for k, v in argsdict.iteritems())
160 cmds.append('%s %s' % (op, args))
160 cmds.append('%s %s' % (op, args))
161
161
162 return ';'.join(cmds)
162 return ';'.join(cmds)
163
163
164 # mapping of options accepted by getbundle and their types
164 # mapping of options accepted by getbundle and their types
165 #
165 #
166 # Meant to be extended by extensions. It is extensions responsibility to ensure
166 # Meant to be extended by extensions. It is extensions responsibility to ensure
167 # such options are properly processed in exchange.getbundle.
167 # such options are properly processed in exchange.getbundle.
168 #
168 #
169 # supported types are:
169 # supported types are:
170 #
170 #
171 # :nodes: list of binary nodes
171 # :nodes: list of binary nodes
172 # :csv: list of comma-separated values
172 # :csv: list of comma-separated values
173 # :scsv: list of comma-separated values return as set
173 # :scsv: list of comma-separated values return as set
174 # :plain: string with no transformation needed.
174 # :plain: string with no transformation needed.
175 gboptsmap = {'heads': 'nodes',
175 gboptsmap = {'heads': 'nodes',
176 'bookmarks': 'boolean',
176 'bookmarks': 'boolean',
177 'common': 'nodes',
177 'common': 'nodes',
178 'obsmarkers': 'boolean',
178 'obsmarkers': 'boolean',
179 'phases': 'boolean',
179 'phases': 'boolean',
180 'bundlecaps': 'scsv',
180 'bundlecaps': 'scsv',
181 'listkeys': 'csv',
181 'listkeys': 'csv',
182 'cg': 'boolean',
182 'cg': 'boolean',
183 'cbattempted': 'boolean',
183 'cbattempted': 'boolean',
184 'stream': 'boolean',
184 'stream': 'boolean',
185 }
185 }
186
186
187 # client side
187 # client side
188
188
189 class wirepeer(repository.legacypeer):
189 class wirepeer(repository.legacypeer):
190 """Client-side interface for communicating with a peer repository.
190 """Client-side interface for communicating with a peer repository.
191
191
192 Methods commonly call wire protocol commands of the same name.
192 Methods commonly call wire protocol commands of the same name.
193
193
194 See also httppeer.py and sshpeer.py for protocol-specific
194 See also httppeer.py and sshpeer.py for protocol-specific
195 implementations of this interface.
195 implementations of this interface.
196 """
196 """
197 # Begin of basewirepeer interface.
197 # Begin of basewirepeer interface.
198
198
199 def iterbatch(self):
199 def iterbatch(self):
200 return remoteiterbatcher(self)
200 return remoteiterbatcher(self)
201
201
202 @batchable
202 @batchable
203 def lookup(self, key):
203 def lookup(self, key):
204 self.requirecap('lookup', _('look up remote revision'))
204 self.requirecap('lookup', _('look up remote revision'))
205 f = future()
205 f = future()
206 yield {'key': encoding.fromlocal(key)}, f
206 yield {'key': encoding.fromlocal(key)}, f
207 d = f.value
207 d = f.value
208 success, data = d[:-1].split(" ", 1)
208 success, data = d[:-1].split(" ", 1)
209 if int(success):
209 if int(success):
210 yield bin(data)
210 yield bin(data)
211 else:
211 else:
212 self._abort(error.RepoError(data))
212 self._abort(error.RepoError(data))
213
213
214 @batchable
214 @batchable
215 def heads(self):
215 def heads(self):
216 f = future()
216 f = future()
217 yield {}, f
217 yield {}, f
218 d = f.value
218 d = f.value
219 try:
219 try:
220 yield decodelist(d[:-1])
220 yield decodelist(d[:-1])
221 except ValueError:
221 except ValueError:
222 self._abort(error.ResponseError(_("unexpected response:"), d))
222 self._abort(error.ResponseError(_("unexpected response:"), d))
223
223
224 @batchable
224 @batchable
225 def known(self, nodes):
225 def known(self, nodes):
226 f = future()
226 f = future()
227 yield {'nodes': encodelist(nodes)}, f
227 yield {'nodes': encodelist(nodes)}, f
228 d = f.value
228 d = f.value
229 try:
229 try:
230 yield [bool(int(b)) for b in d]
230 yield [bool(int(b)) for b in d]
231 except ValueError:
231 except ValueError:
232 self._abort(error.ResponseError(_("unexpected response:"), d))
232 self._abort(error.ResponseError(_("unexpected response:"), d))
233
233
234 @batchable
234 @batchable
235 def branchmap(self):
235 def branchmap(self):
236 f = future()
236 f = future()
237 yield {}, f
237 yield {}, f
238 d = f.value
238 d = f.value
239 try:
239 try:
240 branchmap = {}
240 branchmap = {}
241 for branchpart in d.splitlines():
241 for branchpart in d.splitlines():
242 branchname, branchheads = branchpart.split(' ', 1)
242 branchname, branchheads = branchpart.split(' ', 1)
243 branchname = encoding.tolocal(urlreq.unquote(branchname))
243 branchname = encoding.tolocal(urlreq.unquote(branchname))
244 branchheads = decodelist(branchheads)
244 branchheads = decodelist(branchheads)
245 branchmap[branchname] = branchheads
245 branchmap[branchname] = branchheads
246 yield branchmap
246 yield branchmap
247 except TypeError:
247 except TypeError:
248 self._abort(error.ResponseError(_("unexpected response:"), d))
248 self._abort(error.ResponseError(_("unexpected response:"), d))
249
249
250 @batchable
250 @batchable
251 def listkeys(self, namespace):
251 def listkeys(self, namespace):
252 if not self.capable('pushkey'):
252 if not self.capable('pushkey'):
253 yield {}, None
253 yield {}, None
254 f = future()
254 f = future()
255 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
255 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
256 yield {'namespace': encoding.fromlocal(namespace)}, f
256 yield {'namespace': encoding.fromlocal(namespace)}, f
257 d = f.value
257 d = f.value
258 self.ui.debug('received listkey for "%s": %i bytes\n'
258 self.ui.debug('received listkey for "%s": %i bytes\n'
259 % (namespace, len(d)))
259 % (namespace, len(d)))
260 yield pushkeymod.decodekeys(d)
260 yield pushkeymod.decodekeys(d)
261
261
262 @batchable
262 @batchable
263 def pushkey(self, namespace, key, old, new):
263 def pushkey(self, namespace, key, old, new):
264 if not self.capable('pushkey'):
264 if not self.capable('pushkey'):
265 yield False, None
265 yield False, None
266 f = future()
266 f = future()
267 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
267 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
268 yield {'namespace': encoding.fromlocal(namespace),
268 yield {'namespace': encoding.fromlocal(namespace),
269 'key': encoding.fromlocal(key),
269 'key': encoding.fromlocal(key),
270 'old': encoding.fromlocal(old),
270 'old': encoding.fromlocal(old),
271 'new': encoding.fromlocal(new)}, f
271 'new': encoding.fromlocal(new)}, f
272 d = f.value
272 d = f.value
273 d, output = d.split('\n', 1)
273 d, output = d.split('\n', 1)
274 try:
274 try:
275 d = bool(int(d))
275 d = bool(int(d))
276 except ValueError:
276 except ValueError:
277 raise error.ResponseError(
277 raise error.ResponseError(
278 _('push failed (unexpected response):'), d)
278 _('push failed (unexpected response):'), d)
279 for l in output.splitlines(True):
279 for l in output.splitlines(True):
280 self.ui.status(_('remote: '), l)
280 self.ui.status(_('remote: '), l)
281 yield d
281 yield d
282
282
283 def stream_out(self):
283 def stream_out(self):
284 return self._callstream('stream_out')
284 return self._callstream('stream_out')
285
285
286 def getbundle(self, source, **kwargs):
286 def getbundle(self, source, **kwargs):
287 kwargs = pycompat.byteskwargs(kwargs)
287 kwargs = pycompat.byteskwargs(kwargs)
288 self.requirecap('getbundle', _('look up remote changes'))
288 self.requirecap('getbundle', _('look up remote changes'))
289 opts = {}
289 opts = {}
290 bundlecaps = kwargs.get('bundlecaps')
290 bundlecaps = kwargs.get('bundlecaps')
291 if bundlecaps is not None:
291 if bundlecaps is not None:
292 kwargs['bundlecaps'] = sorted(bundlecaps)
292 kwargs['bundlecaps'] = sorted(bundlecaps)
293 else:
293 else:
294 bundlecaps = () # kwargs could have it to None
294 bundlecaps = () # kwargs could have it to None
295 for key, value in kwargs.iteritems():
295 for key, value in kwargs.iteritems():
296 if value is None:
296 if value is None:
297 continue
297 continue
298 keytype = gboptsmap.get(key)
298 keytype = gboptsmap.get(key)
299 if keytype is None:
299 if keytype is None:
300 raise error.ProgrammingError(
300 raise error.ProgrammingError(
301 'Unexpectedly None keytype for key %s' % key)
301 'Unexpectedly None keytype for key %s' % key)
302 elif keytype == 'nodes':
302 elif keytype == 'nodes':
303 value = encodelist(value)
303 value = encodelist(value)
304 elif keytype in ('csv', 'scsv'):
304 elif keytype in ('csv', 'scsv'):
305 value = ','.join(value)
305 value = ','.join(value)
306 elif keytype == 'boolean':
306 elif keytype == 'boolean':
307 value = '%i' % bool(value)
307 value = '%i' % bool(value)
308 elif keytype != 'plain':
308 elif keytype != 'plain':
309 raise KeyError('unknown getbundle option type %s'
309 raise KeyError('unknown getbundle option type %s'
310 % keytype)
310 % keytype)
311 opts[key] = value
311 opts[key] = value
312 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
312 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
313 if any((cap.startswith('HG2') for cap in bundlecaps)):
313 if any((cap.startswith('HG2') for cap in bundlecaps)):
314 return bundle2.getunbundler(self.ui, f)
314 return bundle2.getunbundler(self.ui, f)
315 else:
315 else:
316 return changegroupmod.cg1unpacker(f, 'UN')
316 return changegroupmod.cg1unpacker(f, 'UN')
317
317
318 def unbundle(self, cg, heads, url):
318 def unbundle(self, cg, heads, url):
319 '''Send cg (a readable file-like object representing the
319 '''Send cg (a readable file-like object representing the
320 changegroup to push, typically a chunkbuffer object) to the
320 changegroup to push, typically a chunkbuffer object) to the
321 remote server as a bundle.
321 remote server as a bundle.
322
322
323 When pushing a bundle10 stream, return an integer indicating the
323 When pushing a bundle10 stream, return an integer indicating the
324 result of the push (see changegroup.apply()).
324 result of the push (see changegroup.apply()).
325
325
326 When pushing a bundle20 stream, return a bundle20 stream.
326 When pushing a bundle20 stream, return a bundle20 stream.
327
327
328 `url` is the url the client thinks it's pushing to, which is
328 `url` is the url the client thinks it's pushing to, which is
329 visible to hooks.
329 visible to hooks.
330 '''
330 '''
331
331
332 if heads != ['force'] and self.capable('unbundlehash'):
332 if heads != ['force'] and self.capable('unbundlehash'):
333 heads = encodelist(['hashed',
333 heads = encodelist(['hashed',
334 hashlib.sha1(''.join(sorted(heads))).digest()])
334 hashlib.sha1(''.join(sorted(heads))).digest()])
335 else:
335 else:
336 heads = encodelist(heads)
336 heads = encodelist(heads)
337
337
338 if util.safehasattr(cg, 'deltaheader'):
338 if util.safehasattr(cg, 'deltaheader'):
339 # this a bundle10, do the old style call sequence
339 # this a bundle10, do the old style call sequence
340 ret, output = self._callpush("unbundle", cg, heads=heads)
340 ret, output = self._callpush("unbundle", cg, heads=heads)
341 if ret == "":
341 if ret == "":
342 raise error.ResponseError(
342 raise error.ResponseError(
343 _('push failed:'), output)
343 _('push failed:'), output)
344 try:
344 try:
345 ret = int(ret)
345 ret = int(ret)
346 except ValueError:
346 except ValueError:
347 raise error.ResponseError(
347 raise error.ResponseError(
348 _('push failed (unexpected response):'), ret)
348 _('push failed (unexpected response):'), ret)
349
349
350 for l in output.splitlines(True):
350 for l in output.splitlines(True):
351 self.ui.status(_('remote: '), l)
351 self.ui.status(_('remote: '), l)
352 else:
352 else:
353 # bundle2 push. Send a stream, fetch a stream.
353 # bundle2 push. Send a stream, fetch a stream.
354 stream = self._calltwowaystream('unbundle', cg, heads=heads)
354 stream = self._calltwowaystream('unbundle', cg, heads=heads)
355 ret = bundle2.getunbundler(self.ui, stream)
355 ret = bundle2.getunbundler(self.ui, stream)
356 return ret
356 return ret
357
357
358 # End of basewirepeer interface.
358 # End of basewirepeer interface.
359
359
360 # Begin of baselegacywirepeer interface.
360 # Begin of baselegacywirepeer interface.
361
361
362 def branches(self, nodes):
362 def branches(self, nodes):
363 n = encodelist(nodes)
363 n = encodelist(nodes)
364 d = self._call("branches", nodes=n)
364 d = self._call("branches", nodes=n)
365 try:
365 try:
366 br = [tuple(decodelist(b)) for b in d.splitlines()]
366 br = [tuple(decodelist(b)) for b in d.splitlines()]
367 return br
367 return br
368 except ValueError:
368 except ValueError:
369 self._abort(error.ResponseError(_("unexpected response:"), d))
369 self._abort(error.ResponseError(_("unexpected response:"), d))
370
370
371 def between(self, pairs):
371 def between(self, pairs):
372 batch = 8 # avoid giant requests
372 batch = 8 # avoid giant requests
373 r = []
373 r = []
374 for i in xrange(0, len(pairs), batch):
374 for i in xrange(0, len(pairs), batch):
375 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
375 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
376 d = self._call("between", pairs=n)
376 d = self._call("between", pairs=n)
377 try:
377 try:
378 r.extend(l and decodelist(l) or [] for l in d.splitlines())
378 r.extend(l and decodelist(l) or [] for l in d.splitlines())
379 except ValueError:
379 except ValueError:
380 self._abort(error.ResponseError(_("unexpected response:"), d))
380 self._abort(error.ResponseError(_("unexpected response:"), d))
381 return r
381 return r
382
382
383 def changegroup(self, nodes, kind):
383 def changegroup(self, nodes, kind):
384 n = encodelist(nodes)
384 n = encodelist(nodes)
385 f = self._callcompressable("changegroup", roots=n)
385 f = self._callcompressable("changegroup", roots=n)
386 return changegroupmod.cg1unpacker(f, 'UN')
386 return changegroupmod.cg1unpacker(f, 'UN')
387
387
388 def changegroupsubset(self, bases, heads, kind):
388 def changegroupsubset(self, bases, heads, kind):
389 self.requirecap('changegroupsubset', _('look up remote changes'))
389 self.requirecap('changegroupsubset', _('look up remote changes'))
390 bases = encodelist(bases)
390 bases = encodelist(bases)
391 heads = encodelist(heads)
391 heads = encodelist(heads)
392 f = self._callcompressable("changegroupsubset",
392 f = self._callcompressable("changegroupsubset",
393 bases=bases, heads=heads)
393 bases=bases, heads=heads)
394 return changegroupmod.cg1unpacker(f, 'UN')
394 return changegroupmod.cg1unpacker(f, 'UN')
395
395
396 # End of baselegacywirepeer interface.
396 # End of baselegacywirepeer interface.
397
397
398 def _submitbatch(self, req):
398 def _submitbatch(self, req):
399 """run batch request <req> on the server
399 """run batch request <req> on the server
400
400
401 Returns an iterator of the raw responses from the server.
401 Returns an iterator of the raw responses from the server.
402 """
402 """
403 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
403 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
404 chunk = rsp.read(1024)
404 chunk = rsp.read(1024)
405 work = [chunk]
405 work = [chunk]
406 while chunk:
406 while chunk:
407 while ';' not in chunk and chunk:
407 while ';' not in chunk and chunk:
408 chunk = rsp.read(1024)
408 chunk = rsp.read(1024)
409 work.append(chunk)
409 work.append(chunk)
410 merged = ''.join(work)
410 merged = ''.join(work)
411 while ';' in merged:
411 while ';' in merged:
412 one, merged = merged.split(';', 1)
412 one, merged = merged.split(';', 1)
413 yield unescapearg(one)
413 yield unescapearg(one)
414 chunk = rsp.read(1024)
414 chunk = rsp.read(1024)
415 work = [merged, chunk]
415 work = [merged, chunk]
416 yield unescapearg(''.join(work))
416 yield unescapearg(''.join(work))
417
417
418 def _submitone(self, op, args):
418 def _submitone(self, op, args):
419 return self._call(op, **pycompat.strkwargs(args))
419 return self._call(op, **pycompat.strkwargs(args))
420
420
421 def debugwireargs(self, one, two, three=None, four=None, five=None):
421 def debugwireargs(self, one, two, three=None, four=None, five=None):
422 # don't pass optional arguments left at their default value
422 # don't pass optional arguments left at their default value
423 opts = {}
423 opts = {}
424 if three is not None:
424 if three is not None:
425 opts[r'three'] = three
425 opts[r'three'] = three
426 if four is not None:
426 if four is not None:
427 opts[r'four'] = four
427 opts[r'four'] = four
428 return self._call('debugwireargs', one=one, two=two, **opts)
428 return self._call('debugwireargs', one=one, two=two, **opts)
429
429
430 def _call(self, cmd, **args):
430 def _call(self, cmd, **args):
431 """execute <cmd> on the server
431 """execute <cmd> on the server
432
432
433 The command is expected to return a simple string.
433 The command is expected to return a simple string.
434
434
435 returns the server reply as a string."""
435 returns the server reply as a string."""
436 raise NotImplementedError()
436 raise NotImplementedError()
437
437
438 def _callstream(self, cmd, **args):
438 def _callstream(self, cmd, **args):
439 """execute <cmd> on the server
439 """execute <cmd> on the server
440
440
441 The command is expected to return a stream. Note that if the
441 The command is expected to return a stream. Note that if the
442 command doesn't return a stream, _callstream behaves
442 command doesn't return a stream, _callstream behaves
443 differently for ssh and http peers.
443 differently for ssh and http peers.
444
444
445 returns the server reply as a file like object.
445 returns the server reply as a file like object.
446 """
446 """
447 raise NotImplementedError()
447 raise NotImplementedError()
448
448
449 def _callcompressable(self, cmd, **args):
449 def _callcompressable(self, cmd, **args):
450 """execute <cmd> on the server
450 """execute <cmd> on the server
451
451
452 The command is expected to return a stream.
452 The command is expected to return a stream.
453
453
454 The stream may have been compressed in some implementations. This
454 The stream may have been compressed in some implementations. This
455 function takes care of the decompression. This is the only difference
455 function takes care of the decompression. This is the only difference
456 with _callstream.
456 with _callstream.
457
457
458 returns the server reply as a file like object.
458 returns the server reply as a file like object.
459 """
459 """
460 raise NotImplementedError()
460 raise NotImplementedError()
461
461
462 def _callpush(self, cmd, fp, **args):
462 def _callpush(self, cmd, fp, **args):
463 """execute a <cmd> on server
463 """execute a <cmd> on server
464
464
465 The command is expected to be related to a push. Push has a special
465 The command is expected to be related to a push. Push has a special
466 return method.
466 return method.
467
467
468 returns the server reply as a (ret, output) tuple. ret is either
468 returns the server reply as a (ret, output) tuple. ret is either
469 empty (error) or a stringified int.
469 empty (error) or a stringified int.
470 """
470 """
471 raise NotImplementedError()
471 raise NotImplementedError()
472
472
473 def _calltwowaystream(self, cmd, fp, **args):
473 def _calltwowaystream(self, cmd, fp, **args):
474 """execute <cmd> on server
474 """execute <cmd> on server
475
475
476 The command will send a stream to the server and get a stream in reply.
476 The command will send a stream to the server and get a stream in reply.
477 """
477 """
478 raise NotImplementedError()
478 raise NotImplementedError()
479
479
480 def _abort(self, exception):
480 def _abort(self, exception):
481 """clearly abort the wire protocol connection and raise the exception
481 """clearly abort the wire protocol connection and raise the exception
482 """
482 """
483 raise NotImplementedError()
483 raise NotImplementedError()
484
484
485 # server side
485 # server side
486
486
487 # wire protocol command can either return a string or one of these classes.
487 # wire protocol command can either return a string or one of these classes.
488
488
489 def getdispatchrepo(repo, proto, command):
489 def getdispatchrepo(repo, proto, command):
490 """Obtain the repo used for processing wire protocol commands.
490 """Obtain the repo used for processing wire protocol commands.
491
491
492 The intent of this function is to serve as a monkeypatch point for
492 The intent of this function is to serve as a monkeypatch point for
493 extensions that need commands to operate on different repo views under
493 extensions that need commands to operate on different repo views under
494 specialized circumstances.
494 specialized circumstances.
495 """
495 """
496 return repo.filtered('served')
496 return repo.filtered('served')
497
497
498 def dispatch(repo, proto, command):
498 def dispatch(repo, proto, command):
499 repo = getdispatchrepo(repo, proto, command)
499 repo = getdispatchrepo(repo, proto, command)
500 func, spec = commands[command]
500 func, spec = commands[command]
501 args = proto.getargs(spec)
501 args = proto.getargs(spec)
502 return func(repo, proto, *args)
502 return func(repo, proto, *args)
503
503
504 def options(cmd, keys, others):
504 def options(cmd, keys, others):
505 opts = {}
505 opts = {}
506 for k in keys:
506 for k in keys:
507 if k in others:
507 if k in others:
508 opts[k] = others[k]
508 opts[k] = others[k]
509 del others[k]
509 del others[k]
510 if others:
510 if others:
511 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
511 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
512 % (cmd, ",".join(others)))
512 % (cmd, ",".join(others)))
513 return opts
513 return opts
514
514
515 def bundle1allowed(repo, action):
515 def bundle1allowed(repo, action):
516 """Whether a bundle1 operation is allowed from the server.
516 """Whether a bundle1 operation is allowed from the server.
517
517
518 Priority is:
518 Priority is:
519
519
520 1. server.bundle1gd.<action> (if generaldelta active)
520 1. server.bundle1gd.<action> (if generaldelta active)
521 2. server.bundle1.<action>
521 2. server.bundle1.<action>
522 3. server.bundle1gd (if generaldelta active)
522 3. server.bundle1gd (if generaldelta active)
523 4. server.bundle1
523 4. server.bundle1
524 """
524 """
525 ui = repo.ui
525 ui = repo.ui
526 gd = 'generaldelta' in repo.requirements
526 gd = 'generaldelta' in repo.requirements
527
527
528 if gd:
528 if gd:
529 v = ui.configbool('server', 'bundle1gd.%s' % action)
529 v = ui.configbool('server', 'bundle1gd.%s' % action)
530 if v is not None:
530 if v is not None:
531 return v
531 return v
532
532
533 v = ui.configbool('server', 'bundle1.%s' % action)
533 v = ui.configbool('server', 'bundle1.%s' % action)
534 if v is not None:
534 if v is not None:
535 return v
535 return v
536
536
537 if gd:
537 if gd:
538 v = ui.configbool('server', 'bundle1gd')
538 v = ui.configbool('server', 'bundle1gd')
539 if v is not None:
539 if v is not None:
540 return v
540 return v
541
541
542 return ui.configbool('server', 'bundle1')
542 return ui.configbool('server', 'bundle1')
543
543
544 def supportedcompengines(ui, role):
544 def supportedcompengines(ui, role):
545 """Obtain the list of supported compression engines for a request."""
545 """Obtain the list of supported compression engines for a request."""
546 assert role in (util.CLIENTROLE, util.SERVERROLE)
546 assert role in (util.CLIENTROLE, util.SERVERROLE)
547
547
548 compengines = util.compengines.supportedwireengines(role)
548 compengines = util.compengines.supportedwireengines(role)
549
549
550 # Allow config to override default list and ordering.
550 # Allow config to override default list and ordering.
551 if role == util.SERVERROLE:
551 if role == util.SERVERROLE:
552 configengines = ui.configlist('server', 'compressionengines')
552 configengines = ui.configlist('server', 'compressionengines')
553 config = 'server.compressionengines'
553 config = 'server.compressionengines'
554 else:
554 else:
555 # This is currently implemented mainly to facilitate testing. In most
555 # This is currently implemented mainly to facilitate testing. In most
556 # cases, the server should be in charge of choosing a compression engine
556 # cases, the server should be in charge of choosing a compression engine
557 # because a server has the most to lose from a sub-optimal choice. (e.g.
557 # because a server has the most to lose from a sub-optimal choice. (e.g.
558 # CPU DoS due to an expensive engine or a network DoS due to poor
558 # CPU DoS due to an expensive engine or a network DoS due to poor
559 # compression ratio).
559 # compression ratio).
560 configengines = ui.configlist('experimental',
560 configengines = ui.configlist('experimental',
561 'clientcompressionengines')
561 'clientcompressionengines')
562 config = 'experimental.clientcompressionengines'
562 config = 'experimental.clientcompressionengines'
563
563
564 # No explicit config. Filter out the ones that aren't supposed to be
564 # No explicit config. Filter out the ones that aren't supposed to be
565 # advertised and return default ordering.
565 # advertised and return default ordering.
566 if not configengines:
566 if not configengines:
567 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
567 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
568 return [e for e in compengines
568 return [e for e in compengines
569 if getattr(e.wireprotosupport(), attr) > 0]
569 if getattr(e.wireprotosupport(), attr) > 0]
570
570
571 # If compression engines are listed in the config, assume there is a good
571 # If compression engines are listed in the config, assume there is a good
572 # reason for it (like server operators wanting to achieve specific
572 # reason for it (like server operators wanting to achieve specific
573 # performance characteristics). So fail fast if the config references
573 # performance characteristics). So fail fast if the config references
574 # unusable compression engines.
574 # unusable compression engines.
575 validnames = set(e.name() for e in compengines)
575 validnames = set(e.name() for e in compengines)
576 invalidnames = set(e for e in configengines if e not in validnames)
576 invalidnames = set(e for e in configengines if e not in validnames)
577 if invalidnames:
577 if invalidnames:
578 raise error.Abort(_('invalid compression engine defined in %s: %s') %
578 raise error.Abort(_('invalid compression engine defined in %s: %s') %
579 (config, ', '.join(sorted(invalidnames))))
579 (config, ', '.join(sorted(invalidnames))))
580
580
581 compengines = [e for e in compengines if e.name() in configengines]
581 compengines = [e for e in compengines if e.name() in configengines]
582 compengines = sorted(compengines,
582 compengines = sorted(compengines,
583 key=lambda e: configengines.index(e.name()))
583 key=lambda e: configengines.index(e.name()))
584
584
585 if not compengines:
585 if not compengines:
586 raise error.Abort(_('%s config option does not specify any known '
586 raise error.Abort(_('%s config option does not specify any known '
587 'compression engines') % config,
587 'compression engines') % config,
588 hint=_('usable compression engines: %s') %
588 hint=_('usable compression engines: %s') %
589 ', '.sorted(validnames))
589 ', '.sorted(validnames))
590
590
591 return compengines
591 return compengines
592
592
593 class commandentry(object):
593 class commandentry(object):
594 """Represents a declared wire protocol command."""
594 """Represents a declared wire protocol command."""
595 def __init__(self, func, args='', transports=None):
595 def __init__(self, func, args='', transports=None):
596 self.func = func
596 self.func = func
597 self.args = args
597 self.args = args
598 self.transports = transports or set()
598 self.transports = transports or set()
599
599
600 def _merge(self, func, args):
600 def _merge(self, func, args):
601 """Merge this instance with an incoming 2-tuple.
601 """Merge this instance with an incoming 2-tuple.
602
602
603 This is called when a caller using the old 2-tuple API attempts
603 This is called when a caller using the old 2-tuple API attempts
604 to replace an instance. The incoming values are merged with
604 to replace an instance. The incoming values are merged with
605 data not captured by the 2-tuple and a new instance containing
605 data not captured by the 2-tuple and a new instance containing
606 the union of the two objects is returned.
606 the union of the two objects is returned.
607 """
607 """
608 return commandentry(func, args=args, transports=set(self.transports))
608 return commandentry(func, args=args, transports=set(self.transports))
609
609
610 # Old code treats instances as 2-tuples. So expose that interface.
610 # Old code treats instances as 2-tuples. So expose that interface.
611 def __iter__(self):
611 def __iter__(self):
612 yield self.func
612 yield self.func
613 yield self.args
613 yield self.args
614
614
615 def __getitem__(self, i):
615 def __getitem__(self, i):
616 if i == 0:
616 if i == 0:
617 return self.func
617 return self.func
618 elif i == 1:
618 elif i == 1:
619 return self.args
619 return self.args
620 else:
620 else:
621 raise IndexError('can only access elements 0 and 1')
621 raise IndexError('can only access elements 0 and 1')
622
622
623 class commanddict(dict):
623 class commanddict(dict):
624 """Container for registered wire protocol commands.
624 """Container for registered wire protocol commands.
625
625
626 It behaves like a dict. But __setitem__ is overwritten to allow silent
626 It behaves like a dict. But __setitem__ is overwritten to allow silent
627 coercion of values from 2-tuples for API compatibility.
627 coercion of values from 2-tuples for API compatibility.
628 """
628 """
629 def __setitem__(self, k, v):
629 def __setitem__(self, k, v):
630 if isinstance(v, commandentry):
630 if isinstance(v, commandentry):
631 pass
631 pass
632 # Cast 2-tuples to commandentry instances.
632 # Cast 2-tuples to commandentry instances.
633 elif isinstance(v, tuple):
633 elif isinstance(v, tuple):
634 if len(v) != 2:
634 if len(v) != 2:
635 raise ValueError('command tuples must have exactly 2 elements')
635 raise ValueError('command tuples must have exactly 2 elements')
636
636
637 # It is common for extensions to wrap wire protocol commands via
637 # It is common for extensions to wrap wire protocol commands via
638 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
638 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
639 # doing this aren't aware of the new API that uses objects to store
639 # doing this aren't aware of the new API that uses objects to store
640 # command entries, we automatically merge old state with new.
640 # command entries, we automatically merge old state with new.
641 if k in self:
641 if k in self:
642 v = self[k]._merge(v[0], v[1])
642 v = self[k]._merge(v[0], v[1])
643 else:
643 else:
644 # Use default values from @wireprotocommand.
644 # Use default values from @wireprotocommand.
645 v = commandentry(v[0], args=v[1],
645 v = commandentry(v[0], args=v[1],
646 transports=set(wireprototypes.TRANSPORTS))
646 transports=set(wireprototypes.TRANSPORTS))
647 else:
647 else:
648 raise ValueError('command entries must be commandentry instances '
648 raise ValueError('command entries must be commandentry instances '
649 'or 2-tuples')
649 'or 2-tuples')
650
650
651 return super(commanddict, self).__setitem__(k, v)
651 return super(commanddict, self).__setitem__(k, v)
652
652
653 def commandavailable(self, command, proto):
653 def commandavailable(self, command, proto):
654 """Determine if a command is available for the requested protocol."""
654 """Determine if a command is available for the requested protocol."""
655 assert proto.name in wireprototypes.TRANSPORTS
655 assert proto.name in wireprototypes.TRANSPORTS
656
656
657 entry = self.get(command)
657 entry = self.get(command)
658
658
659 if not entry:
659 if not entry:
660 return False
660 return False
661
661
662 if proto.name not in entry.transports:
662 if proto.name not in entry.transports:
663 return False
663 return False
664
664
665 return True
665 return True
666
666
667 # Constants specifying which transports a wire protocol command should be
667 # Constants specifying which transports a wire protocol command should be
668 # available on. For use with @wireprotocommand.
668 # available on. For use with @wireprotocommand.
669 POLICY_ALL = 'all'
669 POLICY_ALL = 'all'
670 POLICY_V1_ONLY = 'v1-only'
670 POLICY_V1_ONLY = 'v1-only'
671 POLICY_V2_ONLY = 'v2-only'
671 POLICY_V2_ONLY = 'v2-only'
672
672
673 commands = commanddict()
673 commands = commanddict()
674
674
675 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL):
675 def wireprotocommand(name, args='', transportpolicy=POLICY_ALL):
676 """Decorator to declare a wire protocol command.
676 """Decorator to declare a wire protocol command.
677
677
678 ``name`` is the name of the wire protocol command being provided.
678 ``name`` is the name of the wire protocol command being provided.
679
679
680 ``args`` is a space-delimited list of named arguments that the command
680 ``args`` is a space-delimited list of named arguments that the command
681 accepts. ``*`` is a special value that says to accept all arguments.
681 accepts. ``*`` is a special value that says to accept all arguments.
682
682
683 ``transportpolicy`` is a POLICY_* constant denoting which transports
683 ``transportpolicy`` is a POLICY_* constant denoting which transports
684 this wire protocol command should be exposed to. By default, commands
684 this wire protocol command should be exposed to. By default, commands
685 are exposed to all wire protocol transports.
685 are exposed to all wire protocol transports.
686 """
686 """
687 if transportpolicy == POLICY_ALL:
687 if transportpolicy == POLICY_ALL:
688 transports = set(wireprototypes.TRANSPORTS)
688 transports = set(wireprototypes.TRANSPORTS)
689 elif transportpolicy == POLICY_V1_ONLY:
689 elif transportpolicy == POLICY_V1_ONLY:
690 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
690 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
691 if v['version'] == 1}
691 if v['version'] == 1}
692 elif transportpolicy == POLICY_V2_ONLY:
692 elif transportpolicy == POLICY_V2_ONLY:
693 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
693 transports = {k for k, v in wireprototypes.TRANSPORTS.items()
694 if v['version'] == 2}
694 if v['version'] == 2}
695 else:
695 else:
696 raise error.Abort(_('invalid transport policy value: %s') %
696 raise error.Abort(_('invalid transport policy value: %s') %
697 transportpolicy)
697 transportpolicy)
698
698
699 def register(func):
699 def register(func):
700 commands[name] = commandentry(func, args=args, transports=transports)
700 commands[name] = commandentry(func, args=args, transports=transports)
701 return func
701 return func
702 return register
702 return register
703
703
704 @wireprotocommand('batch', 'cmds *')
704 @wireprotocommand('batch', 'cmds *')
705 def batch(repo, proto, cmds, others):
705 def batch(repo, proto, cmds, others):
706 repo = repo.filtered("served")
706 repo = repo.filtered("served")
707 res = []
707 res = []
708 for pair in cmds.split(';'):
708 for pair in cmds.split(';'):
709 op, args = pair.split(' ', 1)
709 op, args = pair.split(' ', 1)
710 vals = {}
710 vals = {}
711 for a in args.split(','):
711 for a in args.split(','):
712 if a:
712 if a:
713 n, v = a.split('=')
713 n, v = a.split('=')
714 vals[unescapearg(n)] = unescapearg(v)
714 vals[unescapearg(n)] = unescapearg(v)
715 func, spec = commands[op]
715 func, spec = commands[op]
716 if spec:
716 if spec:
717 keys = spec.split()
717 keys = spec.split()
718 data = {}
718 data = {}
719 for k in keys:
719 for k in keys:
720 if k == '*':
720 if k == '*':
721 star = {}
721 star = {}
722 for key in vals.keys():
722 for key in vals.keys():
723 if key not in keys:
723 if key not in keys:
724 star[key] = vals[key]
724 star[key] = vals[key]
725 data['*'] = star
725 data['*'] = star
726 else:
726 else:
727 data[k] = vals[k]
727 data[k] = vals[k]
728 result = func(repo, proto, *[data[k] for k in keys])
728 result = func(repo, proto, *[data[k] for k in keys])
729 else:
729 else:
730 result = func(repo, proto)
730 result = func(repo, proto)
731 if isinstance(result, ooberror):
731 if isinstance(result, ooberror):
732 return result
732 return result
733
733
734 # For now, all batchable commands must return bytesresponse or
734 # For now, all batchable commands must return bytesresponse or
735 # raw bytes (for backwards compatibility).
735 # raw bytes (for backwards compatibility).
736 assert isinstance(result, (bytesresponse, bytes))
736 assert isinstance(result, (bytesresponse, bytes))
737 if isinstance(result, bytesresponse):
737 if isinstance(result, bytesresponse):
738 result = result.data
738 result = result.data
739 res.append(escapearg(result))
739 res.append(escapearg(result))
740
740
741 return bytesresponse(';'.join(res))
741 return bytesresponse(';'.join(res))
742
742
743 # TODO mark as version 1 transport only once interaction with
743 # TODO mark as version 1 transport only once interaction with
744 # SSH handshake mechanism is figured out.
744 # SSH handshake mechanism is figured out.
745 @wireprotocommand('between', 'pairs')
745 @wireprotocommand('between', 'pairs')
746 def between(repo, proto, pairs):
746 def between(repo, proto, pairs):
747 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
747 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
748 r = []
748 r = []
749 for b in repo.between(pairs):
749 for b in repo.between(pairs):
750 r.append(encodelist(b) + "\n")
750 r.append(encodelist(b) + "\n")
751
751
752 return bytesresponse(''.join(r))
752 return bytesresponse(''.join(r))
753
753
754 @wireprotocommand('branchmap')
754 @wireprotocommand('branchmap')
755 def branchmap(repo, proto):
755 def branchmap(repo, proto):
756 branchmap = repo.branchmap()
756 branchmap = repo.branchmap()
757 heads = []
757 heads = []
758 for branch, nodes in branchmap.iteritems():
758 for branch, nodes in branchmap.iteritems():
759 branchname = urlreq.quote(encoding.fromlocal(branch))
759 branchname = urlreq.quote(encoding.fromlocal(branch))
760 branchnodes = encodelist(nodes)
760 branchnodes = encodelist(nodes)
761 heads.append('%s %s' % (branchname, branchnodes))
761 heads.append('%s %s' % (branchname, branchnodes))
762
762
763 return bytesresponse('\n'.join(heads))
763 return bytesresponse('\n'.join(heads))
764
764
765 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY)
765 @wireprotocommand('branches', 'nodes', transportpolicy=POLICY_V1_ONLY)
766 def branches(repo, proto, nodes):
766 def branches(repo, proto, nodes):
767 nodes = decodelist(nodes)
767 nodes = decodelist(nodes)
768 r = []
768 r = []
769 for b in repo.branches(nodes):
769 for b in repo.branches(nodes):
770 r.append(encodelist(b) + "\n")
770 r.append(encodelist(b) + "\n")
771
771
772 return bytesresponse(''.join(r))
772 return bytesresponse(''.join(r))
773
773
774 @wireprotocommand('clonebundles', '')
774 @wireprotocommand('clonebundles', '')
775 def clonebundles(repo, proto):
775 def clonebundles(repo, proto):
776 """Server command for returning info for available bundles to seed clones.
776 """Server command for returning info for available bundles to seed clones.
777
777
778 Clients will parse this response and determine what bundle to fetch.
778 Clients will parse this response and determine what bundle to fetch.
779
779
780 Extensions may wrap this command to filter or dynamically emit data
780 Extensions may wrap this command to filter or dynamically emit data
781 depending on the request. e.g. you could advertise URLs for the closest
781 depending on the request. e.g. you could advertise URLs for the closest
782 data center given the client's IP address.
782 data center given the client's IP address.
783 """
783 """
784 return bytesresponse(repo.vfs.tryread('clonebundles.manifest'))
784 return bytesresponse(repo.vfs.tryread('clonebundles.manifest'))
785
785
786 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
786 wireprotocaps = ['lookup', 'branchmap', 'pushkey',
787 'known', 'getbundle', 'unbundlehash', 'batch']
787 'known', 'getbundle', 'unbundlehash', 'batch']
788
788
789 def _capabilities(repo, proto):
789 def _capabilities(repo, proto):
790 """return a list of capabilities for a repo
790 """return a list of capabilities for a repo
791
791
792 This function exists to allow extensions to easily wrap capabilities
792 This function exists to allow extensions to easily wrap capabilities
793 computation
793 computation
794
794
795 - returns a lists: easy to alter
795 - returns a lists: easy to alter
796 - change done here will be propagated to both `capabilities` and `hello`
796 - change done here will be propagated to both `capabilities` and `hello`
797 command without any other action needed.
797 command without any other action needed.
798 """
798 """
799 # copy to prevent modification of the global list
799 # copy to prevent modification of the global list
800 caps = list(wireprotocaps)
800 caps = list(wireprotocaps)
801
801
802 # Command of same name as capability isn't exposed to version 1 of
802 # Command of same name as capability isn't exposed to version 1 of
803 # transports. So conditionally add it.
803 # transports. So conditionally add it.
804 if commands.commandavailable('changegroupsubset', proto):
804 if commands.commandavailable('changegroupsubset', proto):
805 caps.append('changegroupsubset')
805 caps.append('changegroupsubset')
806
806
807 if streamclone.allowservergeneration(repo):
807 if streamclone.allowservergeneration(repo):
808 if repo.ui.configbool('server', 'preferuncompressed'):
808 if repo.ui.configbool('server', 'preferuncompressed'):
809 caps.append('stream-preferred')
809 caps.append('stream-preferred')
810 requiredformats = repo.requirements & repo.supportedformats
810 requiredformats = repo.requirements & repo.supportedformats
811 # if our local revlogs are just revlogv1, add 'stream' cap
811 # if our local revlogs are just revlogv1, add 'stream' cap
812 if not requiredformats - {'revlogv1'}:
812 if not requiredformats - {'revlogv1'}:
813 caps.append('stream')
813 caps.append('stream')
814 # otherwise, add 'streamreqs' detailing our local revlog format
814 # otherwise, add 'streamreqs' detailing our local revlog format
815 else:
815 else:
816 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
816 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
817 if repo.ui.configbool('experimental', 'bundle2-advertise'):
817 if repo.ui.configbool('experimental', 'bundle2-advertise'):
818 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
818 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
819 caps.append('bundle2=' + urlreq.quote(capsblob))
819 caps.append('bundle2=' + urlreq.quote(capsblob))
820 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
820 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
821
821
822 if proto.name == 'http-v1':
822 return proto.addcapabilities(repo, caps)
823 caps.append('httpheader=%d' %
824 repo.ui.configint('server', 'maxhttpheaderlen'))
825 if repo.ui.configbool('experimental', 'httppostargs'):
826 caps.append('httppostargs')
827
828 # FUTURE advertise 0.2rx once support is implemented
829 # FUTURE advertise minrx and mintx after consulting config option
830 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
831
832 compengines = supportedcompengines(repo.ui, util.SERVERROLE)
833 if compengines:
834 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
835 for e in compengines)
836 caps.append('compression=%s' % comptypes)
837
838 return caps
839
823
840 # If you are writing an extension and consider wrapping this function. Wrap
824 # If you are writing an extension and consider wrapping this function. Wrap
841 # `_capabilities` instead.
825 # `_capabilities` instead.
842 @wireprotocommand('capabilities')
826 @wireprotocommand('capabilities')
843 def capabilities(repo, proto):
827 def capabilities(repo, proto):
844 return bytesresponse(' '.join(_capabilities(repo, proto)))
828 return bytesresponse(' '.join(_capabilities(repo, proto)))
845
829
846 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY)
830 @wireprotocommand('changegroup', 'roots', transportpolicy=POLICY_V1_ONLY)
847 def changegroup(repo, proto, roots):
831 def changegroup(repo, proto, roots):
848 nodes = decodelist(roots)
832 nodes = decodelist(roots)
849 outgoing = discovery.outgoing(repo, missingroots=nodes,
833 outgoing = discovery.outgoing(repo, missingroots=nodes,
850 missingheads=repo.heads())
834 missingheads=repo.heads())
851 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
835 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
852 gen = iter(lambda: cg.read(32768), '')
836 gen = iter(lambda: cg.read(32768), '')
853 return streamres(gen=gen)
837 return streamres(gen=gen)
854
838
855 @wireprotocommand('changegroupsubset', 'bases heads',
839 @wireprotocommand('changegroupsubset', 'bases heads',
856 transportpolicy=POLICY_V1_ONLY)
840 transportpolicy=POLICY_V1_ONLY)
857 def changegroupsubset(repo, proto, bases, heads):
841 def changegroupsubset(repo, proto, bases, heads):
858 bases = decodelist(bases)
842 bases = decodelist(bases)
859 heads = decodelist(heads)
843 heads = decodelist(heads)
860 outgoing = discovery.outgoing(repo, missingroots=bases,
844 outgoing = discovery.outgoing(repo, missingroots=bases,
861 missingheads=heads)
845 missingheads=heads)
862 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
846 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
863 gen = iter(lambda: cg.read(32768), '')
847 gen = iter(lambda: cg.read(32768), '')
864 return streamres(gen=gen)
848 return streamres(gen=gen)
865
849
866 @wireprotocommand('debugwireargs', 'one two *')
850 @wireprotocommand('debugwireargs', 'one two *')
867 def debugwireargs(repo, proto, one, two, others):
851 def debugwireargs(repo, proto, one, two, others):
868 # only accept optional args from the known set
852 # only accept optional args from the known set
869 opts = options('debugwireargs', ['three', 'four'], others)
853 opts = options('debugwireargs', ['three', 'four'], others)
870 return bytesresponse(repo.debugwireargs(one, two,
854 return bytesresponse(repo.debugwireargs(one, two,
871 **pycompat.strkwargs(opts)))
855 **pycompat.strkwargs(opts)))
872
856
873 @wireprotocommand('getbundle', '*')
857 @wireprotocommand('getbundle', '*')
874 def getbundle(repo, proto, others):
858 def getbundle(repo, proto, others):
875 opts = options('getbundle', gboptsmap.keys(), others)
859 opts = options('getbundle', gboptsmap.keys(), others)
876 for k, v in opts.iteritems():
860 for k, v in opts.iteritems():
877 keytype = gboptsmap[k]
861 keytype = gboptsmap[k]
878 if keytype == 'nodes':
862 if keytype == 'nodes':
879 opts[k] = decodelist(v)
863 opts[k] = decodelist(v)
880 elif keytype == 'csv':
864 elif keytype == 'csv':
881 opts[k] = list(v.split(','))
865 opts[k] = list(v.split(','))
882 elif keytype == 'scsv':
866 elif keytype == 'scsv':
883 opts[k] = set(v.split(','))
867 opts[k] = set(v.split(','))
884 elif keytype == 'boolean':
868 elif keytype == 'boolean':
885 # Client should serialize False as '0', which is a non-empty string
869 # Client should serialize False as '0', which is a non-empty string
886 # so it evaluates as a True bool.
870 # so it evaluates as a True bool.
887 if v == '0':
871 if v == '0':
888 opts[k] = False
872 opts[k] = False
889 else:
873 else:
890 opts[k] = bool(v)
874 opts[k] = bool(v)
891 elif keytype != 'plain':
875 elif keytype != 'plain':
892 raise KeyError('unknown getbundle option type %s'
876 raise KeyError('unknown getbundle option type %s'
893 % keytype)
877 % keytype)
894
878
895 if not bundle1allowed(repo, 'pull'):
879 if not bundle1allowed(repo, 'pull'):
896 if not exchange.bundle2requested(opts.get('bundlecaps')):
880 if not exchange.bundle2requested(opts.get('bundlecaps')):
897 if proto.name == 'http-v1':
881 if proto.name == 'http-v1':
898 return ooberror(bundle2required)
882 return ooberror(bundle2required)
899 raise error.Abort(bundle2requiredmain,
883 raise error.Abort(bundle2requiredmain,
900 hint=bundle2requiredhint)
884 hint=bundle2requiredhint)
901
885
902 prefercompressed = True
886 prefercompressed = True
903
887
904 try:
888 try:
905 if repo.ui.configbool('server', 'disablefullbundle'):
889 if repo.ui.configbool('server', 'disablefullbundle'):
906 # Check to see if this is a full clone.
890 # Check to see if this is a full clone.
907 clheads = set(repo.changelog.heads())
891 clheads = set(repo.changelog.heads())
908 changegroup = opts.get('cg', True)
892 changegroup = opts.get('cg', True)
909 heads = set(opts.get('heads', set()))
893 heads = set(opts.get('heads', set()))
910 common = set(opts.get('common', set()))
894 common = set(opts.get('common', set()))
911 common.discard(nullid)
895 common.discard(nullid)
912 if changegroup and not common and clheads == heads:
896 if changegroup and not common and clheads == heads:
913 raise error.Abort(
897 raise error.Abort(
914 _('server has pull-based clones disabled'),
898 _('server has pull-based clones disabled'),
915 hint=_('remove --pull if specified or upgrade Mercurial'))
899 hint=_('remove --pull if specified or upgrade Mercurial'))
916
900
917 info, chunks = exchange.getbundlechunks(repo, 'serve',
901 info, chunks = exchange.getbundlechunks(repo, 'serve',
918 **pycompat.strkwargs(opts))
902 **pycompat.strkwargs(opts))
919 prefercompressed = info.get('prefercompressed', True)
903 prefercompressed = info.get('prefercompressed', True)
920 except error.Abort as exc:
904 except error.Abort as exc:
921 # cleanly forward Abort error to the client
905 # cleanly forward Abort error to the client
922 if not exchange.bundle2requested(opts.get('bundlecaps')):
906 if not exchange.bundle2requested(opts.get('bundlecaps')):
923 if proto.name == 'http-v1':
907 if proto.name == 'http-v1':
924 return ooberror(pycompat.bytestr(exc) + '\n')
908 return ooberror(pycompat.bytestr(exc) + '\n')
925 raise # cannot do better for bundle1 + ssh
909 raise # cannot do better for bundle1 + ssh
926 # bundle2 request expect a bundle2 reply
910 # bundle2 request expect a bundle2 reply
927 bundler = bundle2.bundle20(repo.ui)
911 bundler = bundle2.bundle20(repo.ui)
928 manargs = [('message', pycompat.bytestr(exc))]
912 manargs = [('message', pycompat.bytestr(exc))]
929 advargs = []
913 advargs = []
930 if exc.hint is not None:
914 if exc.hint is not None:
931 advargs.append(('hint', exc.hint))
915 advargs.append(('hint', exc.hint))
932 bundler.addpart(bundle2.bundlepart('error:abort',
916 bundler.addpart(bundle2.bundlepart('error:abort',
933 manargs, advargs))
917 manargs, advargs))
934 chunks = bundler.getchunks()
918 chunks = bundler.getchunks()
935 prefercompressed = False
919 prefercompressed = False
936
920
937 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
921 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
938
922
939 @wireprotocommand('heads')
923 @wireprotocommand('heads')
940 def heads(repo, proto):
924 def heads(repo, proto):
941 h = repo.heads()
925 h = repo.heads()
942 return bytesresponse(encodelist(h) + '\n')
926 return bytesresponse(encodelist(h) + '\n')
943
927
944 @wireprotocommand('hello')
928 @wireprotocommand('hello')
945 def hello(repo, proto):
929 def hello(repo, proto):
946 """Called as part of SSH handshake to obtain server info.
930 """Called as part of SSH handshake to obtain server info.
947
931
948 Returns a list of lines describing interesting things about the
932 Returns a list of lines describing interesting things about the
949 server, in an RFC822-like format.
933 server, in an RFC822-like format.
950
934
951 Currently, the only one defined is ``capabilities``, which consists of a
935 Currently, the only one defined is ``capabilities``, which consists of a
952 line of space separated tokens describing server abilities:
936 line of space separated tokens describing server abilities:
953
937
954 capabilities: <token0> <token1> <token2>
938 capabilities: <token0> <token1> <token2>
955 """
939 """
956 caps = capabilities(repo, proto).data
940 caps = capabilities(repo, proto).data
957 return bytesresponse('capabilities: %s\n' % caps)
941 return bytesresponse('capabilities: %s\n' % caps)
958
942
959 @wireprotocommand('listkeys', 'namespace')
943 @wireprotocommand('listkeys', 'namespace')
960 def listkeys(repo, proto, namespace):
944 def listkeys(repo, proto, namespace):
961 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
945 d = sorted(repo.listkeys(encoding.tolocal(namespace)).items())
962 return bytesresponse(pushkeymod.encodekeys(d))
946 return bytesresponse(pushkeymod.encodekeys(d))
963
947
964 @wireprotocommand('lookup', 'key')
948 @wireprotocommand('lookup', 'key')
965 def lookup(repo, proto, key):
949 def lookup(repo, proto, key):
966 try:
950 try:
967 k = encoding.tolocal(key)
951 k = encoding.tolocal(key)
968 c = repo[k]
952 c = repo[k]
969 r = c.hex()
953 r = c.hex()
970 success = 1
954 success = 1
971 except Exception as inst:
955 except Exception as inst:
972 r = util.forcebytestr(inst)
956 r = util.forcebytestr(inst)
973 success = 0
957 success = 0
974 return bytesresponse('%d %s\n' % (success, r))
958 return bytesresponse('%d %s\n' % (success, r))
975
959
976 @wireprotocommand('known', 'nodes *')
960 @wireprotocommand('known', 'nodes *')
977 def known(repo, proto, nodes, others):
961 def known(repo, proto, nodes, others):
978 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
962 v = ''.join(b and '1' or '0' for b in repo.known(decodelist(nodes)))
979 return bytesresponse(v)
963 return bytesresponse(v)
980
964
981 @wireprotocommand('pushkey', 'namespace key old new')
965 @wireprotocommand('pushkey', 'namespace key old new')
982 def pushkey(repo, proto, namespace, key, old, new):
966 def pushkey(repo, proto, namespace, key, old, new):
983 # compatibility with pre-1.8 clients which were accidentally
967 # compatibility with pre-1.8 clients which were accidentally
984 # sending raw binary nodes rather than utf-8-encoded hex
968 # sending raw binary nodes rather than utf-8-encoded hex
985 if len(new) == 20 and util.escapestr(new) != new:
969 if len(new) == 20 and util.escapestr(new) != new:
986 # looks like it could be a binary node
970 # looks like it could be a binary node
987 try:
971 try:
988 new.decode('utf-8')
972 new.decode('utf-8')
989 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
973 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
990 except UnicodeDecodeError:
974 except UnicodeDecodeError:
991 pass # binary, leave unmodified
975 pass # binary, leave unmodified
992 else:
976 else:
993 new = encoding.tolocal(new) # normal path
977 new = encoding.tolocal(new) # normal path
994
978
995 with proto.mayberedirectstdio() as output:
979 with proto.mayberedirectstdio() as output:
996 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
980 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
997 encoding.tolocal(old), new) or False
981 encoding.tolocal(old), new) or False
998
982
999 output = output.getvalue() if output else ''
983 output = output.getvalue() if output else ''
1000 return bytesresponse('%d\n%s' % (int(r), output))
984 return bytesresponse('%d\n%s' % (int(r), output))
1001
985
1002 @wireprotocommand('stream_out')
986 @wireprotocommand('stream_out')
1003 def stream(repo, proto):
987 def stream(repo, proto):
1004 '''If the server supports streaming clone, it advertises the "stream"
988 '''If the server supports streaming clone, it advertises the "stream"
1005 capability with a value representing the version and flags of the repo
989 capability with a value representing the version and flags of the repo
1006 it is serving. Client checks to see if it understands the format.
990 it is serving. Client checks to see if it understands the format.
1007 '''
991 '''
1008 return streamres_legacy(streamclone.generatev1wireproto(repo))
992 return streamres_legacy(streamclone.generatev1wireproto(repo))
1009
993
1010 @wireprotocommand('unbundle', 'heads')
994 @wireprotocommand('unbundle', 'heads')
1011 def unbundle(repo, proto, heads):
995 def unbundle(repo, proto, heads):
1012 their_heads = decodelist(heads)
996 their_heads = decodelist(heads)
1013
997
1014 with proto.mayberedirectstdio() as output:
998 with proto.mayberedirectstdio() as output:
1015 try:
999 try:
1016 exchange.check_heads(repo, their_heads, 'preparing changes')
1000 exchange.check_heads(repo, their_heads, 'preparing changes')
1017
1001
1018 # write bundle data to temporary file because it can be big
1002 # write bundle data to temporary file because it can be big
1019 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1003 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1020 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1004 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1021 r = 0
1005 r = 0
1022 try:
1006 try:
1023 proto.forwardpayload(fp)
1007 proto.forwardpayload(fp)
1024 fp.seek(0)
1008 fp.seek(0)
1025 gen = exchange.readbundle(repo.ui, fp, None)
1009 gen = exchange.readbundle(repo.ui, fp, None)
1026 if (isinstance(gen, changegroupmod.cg1unpacker)
1010 if (isinstance(gen, changegroupmod.cg1unpacker)
1027 and not bundle1allowed(repo, 'push')):
1011 and not bundle1allowed(repo, 'push')):
1028 if proto.name == 'http-v1':
1012 if proto.name == 'http-v1':
1029 # need to special case http because stderr do not get to
1013 # need to special case http because stderr do not get to
1030 # the http client on failed push so we need to abuse
1014 # the http client on failed push so we need to abuse
1031 # some other error type to make sure the message get to
1015 # some other error type to make sure the message get to
1032 # the user.
1016 # the user.
1033 return ooberror(bundle2required)
1017 return ooberror(bundle2required)
1034 raise error.Abort(bundle2requiredmain,
1018 raise error.Abort(bundle2requiredmain,
1035 hint=bundle2requiredhint)
1019 hint=bundle2requiredhint)
1036
1020
1037 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1021 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1038 proto.client())
1022 proto.client())
1039 if util.safehasattr(r, 'addpart'):
1023 if util.safehasattr(r, 'addpart'):
1040 # The return looks streamable, we are in the bundle2 case
1024 # The return looks streamable, we are in the bundle2 case
1041 # and should return a stream.
1025 # and should return a stream.
1042 return streamres_legacy(gen=r.getchunks())
1026 return streamres_legacy(gen=r.getchunks())
1043 return pushres(r, output.getvalue() if output else '')
1027 return pushres(r, output.getvalue() if output else '')
1044
1028
1045 finally:
1029 finally:
1046 fp.close()
1030 fp.close()
1047 os.unlink(tempname)
1031 os.unlink(tempname)
1048
1032
1049 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1033 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1050 # handle non-bundle2 case first
1034 # handle non-bundle2 case first
1051 if not getattr(exc, 'duringunbundle2', False):
1035 if not getattr(exc, 'duringunbundle2', False):
1052 try:
1036 try:
1053 raise
1037 raise
1054 except error.Abort:
1038 except error.Abort:
1055 # The old code we moved used util.stderr directly.
1039 # The old code we moved used util.stderr directly.
1056 # We did not change it to minimise code change.
1040 # We did not change it to minimise code change.
1057 # This need to be moved to something proper.
1041 # This need to be moved to something proper.
1058 # Feel free to do it.
1042 # Feel free to do it.
1059 util.stderr.write("abort: %s\n" % exc)
1043 util.stderr.write("abort: %s\n" % exc)
1060 if exc.hint is not None:
1044 if exc.hint is not None:
1061 util.stderr.write("(%s)\n" % exc.hint)
1045 util.stderr.write("(%s)\n" % exc.hint)
1062 return pushres(0, output.getvalue() if output else '')
1046 return pushres(0, output.getvalue() if output else '')
1063 except error.PushRaced:
1047 except error.PushRaced:
1064 return pusherr(str(exc),
1048 return pusherr(str(exc),
1065 output.getvalue() if output else '')
1049 output.getvalue() if output else '')
1066
1050
1067 bundler = bundle2.bundle20(repo.ui)
1051 bundler = bundle2.bundle20(repo.ui)
1068 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1052 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1069 bundler.addpart(out)
1053 bundler.addpart(out)
1070 try:
1054 try:
1071 try:
1055 try:
1072 raise
1056 raise
1073 except error.PushkeyFailed as exc:
1057 except error.PushkeyFailed as exc:
1074 # check client caps
1058 # check client caps
1075 remotecaps = getattr(exc, '_replycaps', None)
1059 remotecaps = getattr(exc, '_replycaps', None)
1076 if (remotecaps is not None
1060 if (remotecaps is not None
1077 and 'pushkey' not in remotecaps.get('error', ())):
1061 and 'pushkey' not in remotecaps.get('error', ())):
1078 # no support remote side, fallback to Abort handler.
1062 # no support remote side, fallback to Abort handler.
1079 raise
1063 raise
1080 part = bundler.newpart('error:pushkey')
1064 part = bundler.newpart('error:pushkey')
1081 part.addparam('in-reply-to', exc.partid)
1065 part.addparam('in-reply-to', exc.partid)
1082 if exc.namespace is not None:
1066 if exc.namespace is not None:
1083 part.addparam('namespace', exc.namespace,
1067 part.addparam('namespace', exc.namespace,
1084 mandatory=False)
1068 mandatory=False)
1085 if exc.key is not None:
1069 if exc.key is not None:
1086 part.addparam('key', exc.key, mandatory=False)
1070 part.addparam('key', exc.key, mandatory=False)
1087 if exc.new is not None:
1071 if exc.new is not None:
1088 part.addparam('new', exc.new, mandatory=False)
1072 part.addparam('new', exc.new, mandatory=False)
1089 if exc.old is not None:
1073 if exc.old is not None:
1090 part.addparam('old', exc.old, mandatory=False)
1074 part.addparam('old', exc.old, mandatory=False)
1091 if exc.ret is not None:
1075 if exc.ret is not None:
1092 part.addparam('ret', exc.ret, mandatory=False)
1076 part.addparam('ret', exc.ret, mandatory=False)
1093 except error.BundleValueError as exc:
1077 except error.BundleValueError as exc:
1094 errpart = bundler.newpart('error:unsupportedcontent')
1078 errpart = bundler.newpart('error:unsupportedcontent')
1095 if exc.parttype is not None:
1079 if exc.parttype is not None:
1096 errpart.addparam('parttype', exc.parttype)
1080 errpart.addparam('parttype', exc.parttype)
1097 if exc.params:
1081 if exc.params:
1098 errpart.addparam('params', '\0'.join(exc.params))
1082 errpart.addparam('params', '\0'.join(exc.params))
1099 except error.Abort as exc:
1083 except error.Abort as exc:
1100 manargs = [('message', util.forcebytestr(exc))]
1084 manargs = [('message', util.forcebytestr(exc))]
1101 advargs = []
1085 advargs = []
1102 if exc.hint is not None:
1086 if exc.hint is not None:
1103 advargs.append(('hint', exc.hint))
1087 advargs.append(('hint', exc.hint))
1104 bundler.addpart(bundle2.bundlepart('error:abort',
1088 bundler.addpart(bundle2.bundlepart('error:abort',
1105 manargs, advargs))
1089 manargs, advargs))
1106 except error.PushRaced as exc:
1090 except error.PushRaced as exc:
1107 bundler.newpart('error:pushraced',
1091 bundler.newpart('error:pushraced',
1108 [('message', util.forcebytestr(exc))])
1092 [('message', util.forcebytestr(exc))])
1109 return streamres_legacy(gen=bundler.getchunks())
1093 return streamres_legacy(gen=bundler.getchunks())
@@ -1,618 +1,639 b''
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import contextlib
9 import contextlib
10 import struct
10 import struct
11 import sys
11 import sys
12 import threading
12 import threading
13
13
14 from .i18n import _
14 from .i18n import _
15 from . import (
15 from . import (
16 encoding,
16 encoding,
17 error,
17 error,
18 hook,
18 hook,
19 pycompat,
19 pycompat,
20 util,
20 util,
21 wireproto,
21 wireproto,
22 wireprototypes,
22 wireprototypes,
23 )
23 )
24
24
25 stringio = util.stringio
25 stringio = util.stringio
26
26
27 urlerr = util.urlerr
27 urlerr = util.urlerr
28 urlreq = util.urlreq
28 urlreq = util.urlreq
29
29
30 HTTP_OK = 200
30 HTTP_OK = 200
31
31
32 HGTYPE = 'application/mercurial-0.1'
32 HGTYPE = 'application/mercurial-0.1'
33 HGTYPE2 = 'application/mercurial-0.2'
33 HGTYPE2 = 'application/mercurial-0.2'
34 HGERRTYPE = 'application/hg-error'
34 HGERRTYPE = 'application/hg-error'
35
35
36 SSHV1 = wireprototypes.SSHV1
36 SSHV1 = wireprototypes.SSHV1
37 SSHV2 = wireprototypes.SSHV2
37 SSHV2 = wireprototypes.SSHV2
38
38
39 def decodevaluefromheaders(req, headerprefix):
39 def decodevaluefromheaders(req, headerprefix):
40 """Decode a long value from multiple HTTP request headers.
40 """Decode a long value from multiple HTTP request headers.
41
41
42 Returns the value as a bytes, not a str.
42 Returns the value as a bytes, not a str.
43 """
43 """
44 chunks = []
44 chunks = []
45 i = 1
45 i = 1
46 prefix = headerprefix.upper().replace(r'-', r'_')
46 prefix = headerprefix.upper().replace(r'-', r'_')
47 while True:
47 while True:
48 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
48 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
49 if v is None:
49 if v is None:
50 break
50 break
51 chunks.append(pycompat.bytesurl(v))
51 chunks.append(pycompat.bytesurl(v))
52 i += 1
52 i += 1
53
53
54 return ''.join(chunks)
54 return ''.join(chunks)
55
55
56 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
56 class httpv1protocolhandler(wireprototypes.baseprotocolhandler):
57 def __init__(self, req, ui):
57 def __init__(self, req, ui):
58 self._req = req
58 self._req = req
59 self._ui = ui
59 self._ui = ui
60
60
61 @property
61 @property
62 def name(self):
62 def name(self):
63 return 'http-v1'
63 return 'http-v1'
64
64
65 def getargs(self, args):
65 def getargs(self, args):
66 knownargs = self._args()
66 knownargs = self._args()
67 data = {}
67 data = {}
68 keys = args.split()
68 keys = args.split()
69 for k in keys:
69 for k in keys:
70 if k == '*':
70 if k == '*':
71 star = {}
71 star = {}
72 for key in knownargs.keys():
72 for key in knownargs.keys():
73 if key != 'cmd' and key not in keys:
73 if key != 'cmd' and key not in keys:
74 star[key] = knownargs[key][0]
74 star[key] = knownargs[key][0]
75 data['*'] = star
75 data['*'] = star
76 else:
76 else:
77 data[k] = knownargs[k][0]
77 data[k] = knownargs[k][0]
78 return [data[k] for k in keys]
78 return [data[k] for k in keys]
79
79
80 def _args(self):
80 def _args(self):
81 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
81 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
82 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
82 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
83 if postlen:
83 if postlen:
84 args.update(urlreq.parseqs(
84 args.update(urlreq.parseqs(
85 self._req.read(postlen), keep_blank_values=True))
85 self._req.read(postlen), keep_blank_values=True))
86 return args
86 return args
87
87
88 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
88 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
89 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
89 args.update(urlreq.parseqs(argvalue, keep_blank_values=True))
90 return args
90 return args
91
91
92 def forwardpayload(self, fp):
92 def forwardpayload(self, fp):
93 if r'HTTP_CONTENT_LENGTH' in self._req.env:
93 if r'HTTP_CONTENT_LENGTH' in self._req.env:
94 length = int(self._req.env[r'HTTP_CONTENT_LENGTH'])
94 length = int(self._req.env[r'HTTP_CONTENT_LENGTH'])
95 else:
95 else:
96 length = int(self._req.env[r'CONTENT_LENGTH'])
96 length = int(self._req.env[r'CONTENT_LENGTH'])
97 # If httppostargs is used, we need to read Content-Length
97 # If httppostargs is used, we need to read Content-Length
98 # minus the amount that was consumed by args.
98 # minus the amount that was consumed by args.
99 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
99 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
100 for s in util.filechunkiter(self._req, limit=length):
100 for s in util.filechunkiter(self._req, limit=length):
101 fp.write(s)
101 fp.write(s)
102
102
103 @contextlib.contextmanager
103 @contextlib.contextmanager
104 def mayberedirectstdio(self):
104 def mayberedirectstdio(self):
105 oldout = self._ui.fout
105 oldout = self._ui.fout
106 olderr = self._ui.ferr
106 olderr = self._ui.ferr
107
107
108 out = util.stringio()
108 out = util.stringio()
109
109
110 try:
110 try:
111 self._ui.fout = out
111 self._ui.fout = out
112 self._ui.ferr = out
112 self._ui.ferr = out
113 yield out
113 yield out
114 finally:
114 finally:
115 self._ui.fout = oldout
115 self._ui.fout = oldout
116 self._ui.ferr = olderr
116 self._ui.ferr = olderr
117
117
118 def client(self):
118 def client(self):
119 return 'remote:%s:%s:%s' % (
119 return 'remote:%s:%s:%s' % (
120 self._req.env.get('wsgi.url_scheme') or 'http',
120 self._req.env.get('wsgi.url_scheme') or 'http',
121 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
121 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
122 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
122 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
123
123
124 def addcapabilities(self, repo, caps):
125 caps.append('httpheader=%d' %
126 repo.ui.configint('server', 'maxhttpheaderlen'))
127 if repo.ui.configbool('experimental', 'httppostargs'):
128 caps.append('httppostargs')
129
130 # FUTURE advertise 0.2rx once support is implemented
131 # FUTURE advertise minrx and mintx after consulting config option
132 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
133
134 compengines = wireproto.supportedcompengines(repo.ui, util.SERVERROLE)
135 if compengines:
136 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
137 for e in compengines)
138 caps.append('compression=%s' % comptypes)
139
140 return caps
141
124 # This method exists mostly so that extensions like remotefilelog can
142 # This method exists mostly so that extensions like remotefilelog can
125 # disable a kludgey legacy method only over http. As of early 2018,
143 # disable a kludgey legacy method only over http. As of early 2018,
126 # there are no other known users, so with any luck we can discard this
144 # there are no other known users, so with any luck we can discard this
127 # hook if remotefilelog becomes a first-party extension.
145 # hook if remotefilelog becomes a first-party extension.
128 def iscmd(cmd):
146 def iscmd(cmd):
129 return cmd in wireproto.commands
147 return cmd in wireproto.commands
130
148
131 def parsehttprequest(repo, req, query):
149 def parsehttprequest(repo, req, query):
132 """Parse the HTTP request for a wire protocol request.
150 """Parse the HTTP request for a wire protocol request.
133
151
134 If the current request appears to be a wire protocol request, this
152 If the current request appears to be a wire protocol request, this
135 function returns a dict with details about that request, including
153 function returns a dict with details about that request, including
136 an ``abstractprotocolserver`` instance suitable for handling the
154 an ``abstractprotocolserver`` instance suitable for handling the
137 request. Otherwise, ``None`` is returned.
155 request. Otherwise, ``None`` is returned.
138
156
139 ``req`` is a ``wsgirequest`` instance.
157 ``req`` is a ``wsgirequest`` instance.
140 """
158 """
141 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
159 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
142 # string parameter. If it isn't present, this isn't a wire protocol
160 # string parameter. If it isn't present, this isn't a wire protocol
143 # request.
161 # request.
144 if r'cmd' not in req.form:
162 if r'cmd' not in req.form:
145 return None
163 return None
146
164
147 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
165 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
148
166
149 # The "cmd" request parameter is used by both the wire protocol and hgweb.
167 # The "cmd" request parameter is used by both the wire protocol and hgweb.
150 # While not all wire protocol commands are available for all transports,
168 # While not all wire protocol commands are available for all transports,
151 # if we see a "cmd" value that resembles a known wire protocol command, we
169 # if we see a "cmd" value that resembles a known wire protocol command, we
152 # route it to a protocol handler. This is better than routing possible
170 # route it to a protocol handler. This is better than routing possible
153 # wire protocol requests to hgweb because it prevents hgweb from using
171 # wire protocol requests to hgweb because it prevents hgweb from using
154 # known wire protocol commands and it is less confusing for machine
172 # known wire protocol commands and it is less confusing for machine
155 # clients.
173 # clients.
156 if not iscmd(cmd):
174 if not iscmd(cmd):
157 return None
175 return None
158
176
159 proto = httpv1protocolhandler(req, repo.ui)
177 proto = httpv1protocolhandler(req, repo.ui)
160
178
161 return {
179 return {
162 'cmd': cmd,
180 'cmd': cmd,
163 'proto': proto,
181 'proto': proto,
164 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
182 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
165 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
183 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
166 }
184 }
167
185
168 def _httpresponsetype(ui, req, prefer_uncompressed):
186 def _httpresponsetype(ui, req, prefer_uncompressed):
169 """Determine the appropriate response type and compression settings.
187 """Determine the appropriate response type and compression settings.
170
188
171 Returns a tuple of (mediatype, compengine, engineopts).
189 Returns a tuple of (mediatype, compengine, engineopts).
172 """
190 """
173 # Determine the response media type and compression engine based
191 # Determine the response media type and compression engine based
174 # on the request parameters.
192 # on the request parameters.
175 protocaps = decodevaluefromheaders(req, r'X-HgProto').split(' ')
193 protocaps = decodevaluefromheaders(req, r'X-HgProto').split(' ')
176
194
177 if '0.2' in protocaps:
195 if '0.2' in protocaps:
178 # All clients are expected to support uncompressed data.
196 # All clients are expected to support uncompressed data.
179 if prefer_uncompressed:
197 if prefer_uncompressed:
180 return HGTYPE2, util._noopengine(), {}
198 return HGTYPE2, util._noopengine(), {}
181
199
182 # Default as defined by wire protocol spec.
200 # Default as defined by wire protocol spec.
183 compformats = ['zlib', 'none']
201 compformats = ['zlib', 'none']
184 for cap in protocaps:
202 for cap in protocaps:
185 if cap.startswith('comp='):
203 if cap.startswith('comp='):
186 compformats = cap[5:].split(',')
204 compformats = cap[5:].split(',')
187 break
205 break
188
206
189 # Now find an agreed upon compression format.
207 # Now find an agreed upon compression format.
190 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
208 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
191 if engine.wireprotosupport().name in compformats:
209 if engine.wireprotosupport().name in compformats:
192 opts = {}
210 opts = {}
193 level = ui.configint('server', '%slevel' % engine.name())
211 level = ui.configint('server', '%slevel' % engine.name())
194 if level is not None:
212 if level is not None:
195 opts['level'] = level
213 opts['level'] = level
196
214
197 return HGTYPE2, engine, opts
215 return HGTYPE2, engine, opts
198
216
199 # No mutually supported compression format. Fall back to the
217 # No mutually supported compression format. Fall back to the
200 # legacy protocol.
218 # legacy protocol.
201
219
202 # Don't allow untrusted settings because disabling compression or
220 # Don't allow untrusted settings because disabling compression or
203 # setting a very high compression level could lead to flooding
221 # setting a very high compression level could lead to flooding
204 # the server's network or CPU.
222 # the server's network or CPU.
205 opts = {'level': ui.configint('server', 'zliblevel')}
223 opts = {'level': ui.configint('server', 'zliblevel')}
206 return HGTYPE, util.compengines['zlib'], opts
224 return HGTYPE, util.compengines['zlib'], opts
207
225
208 def _callhttp(repo, req, proto, cmd):
226 def _callhttp(repo, req, proto, cmd):
209 def genversion2(gen, engine, engineopts):
227 def genversion2(gen, engine, engineopts):
210 # application/mercurial-0.2 always sends a payload header
228 # application/mercurial-0.2 always sends a payload header
211 # identifying the compression engine.
229 # identifying the compression engine.
212 name = engine.wireprotosupport().name
230 name = engine.wireprotosupport().name
213 assert 0 < len(name) < 256
231 assert 0 < len(name) < 256
214 yield struct.pack('B', len(name))
232 yield struct.pack('B', len(name))
215 yield name
233 yield name
216
234
217 for chunk in gen:
235 for chunk in gen:
218 yield chunk
236 yield chunk
219
237
220 rsp = wireproto.dispatch(repo, proto, cmd)
238 rsp = wireproto.dispatch(repo, proto, cmd)
221
239
222 if not wireproto.commands.commandavailable(cmd, proto):
240 if not wireproto.commands.commandavailable(cmd, proto):
223 req.respond(HTTP_OK, HGERRTYPE,
241 req.respond(HTTP_OK, HGERRTYPE,
224 body=_('requested wire protocol command is not available '
242 body=_('requested wire protocol command is not available '
225 'over HTTP'))
243 'over HTTP'))
226 return []
244 return []
227
245
228 if isinstance(rsp, bytes):
246 if isinstance(rsp, bytes):
229 req.respond(HTTP_OK, HGTYPE, body=rsp)
247 req.respond(HTTP_OK, HGTYPE, body=rsp)
230 return []
248 return []
231 elif isinstance(rsp, wireprototypes.bytesresponse):
249 elif isinstance(rsp, wireprototypes.bytesresponse):
232 req.respond(HTTP_OK, HGTYPE, body=rsp.data)
250 req.respond(HTTP_OK, HGTYPE, body=rsp.data)
233 return []
251 return []
234 elif isinstance(rsp, wireprototypes.streamreslegacy):
252 elif isinstance(rsp, wireprototypes.streamreslegacy):
235 gen = rsp.gen
253 gen = rsp.gen
236 req.respond(HTTP_OK, HGTYPE)
254 req.respond(HTTP_OK, HGTYPE)
237 return gen
255 return gen
238 elif isinstance(rsp, wireprototypes.streamres):
256 elif isinstance(rsp, wireprototypes.streamres):
239 gen = rsp.gen
257 gen = rsp.gen
240
258
241 # This code for compression should not be streamres specific. It
259 # This code for compression should not be streamres specific. It
242 # is here because we only compress streamres at the moment.
260 # is here because we only compress streamres at the moment.
243 mediatype, engine, engineopts = _httpresponsetype(
261 mediatype, engine, engineopts = _httpresponsetype(
244 repo.ui, req, rsp.prefer_uncompressed)
262 repo.ui, req, rsp.prefer_uncompressed)
245 gen = engine.compressstream(gen, engineopts)
263 gen = engine.compressstream(gen, engineopts)
246
264
247 if mediatype == HGTYPE2:
265 if mediatype == HGTYPE2:
248 gen = genversion2(gen, engine, engineopts)
266 gen = genversion2(gen, engine, engineopts)
249
267
250 req.respond(HTTP_OK, mediatype)
268 req.respond(HTTP_OK, mediatype)
251 return gen
269 return gen
252 elif isinstance(rsp, wireprototypes.pushres):
270 elif isinstance(rsp, wireprototypes.pushres):
253 rsp = '%d\n%s' % (rsp.res, rsp.output)
271 rsp = '%d\n%s' % (rsp.res, rsp.output)
254 req.respond(HTTP_OK, HGTYPE, body=rsp)
272 req.respond(HTTP_OK, HGTYPE, body=rsp)
255 return []
273 return []
256 elif isinstance(rsp, wireprototypes.pusherr):
274 elif isinstance(rsp, wireprototypes.pusherr):
257 # This is the httplib workaround documented in _handlehttperror().
275 # This is the httplib workaround documented in _handlehttperror().
258 req.drain()
276 req.drain()
259
277
260 rsp = '0\n%s\n' % rsp.res
278 rsp = '0\n%s\n' % rsp.res
261 req.respond(HTTP_OK, HGTYPE, body=rsp)
279 req.respond(HTTP_OK, HGTYPE, body=rsp)
262 return []
280 return []
263 elif isinstance(rsp, wireprototypes.ooberror):
281 elif isinstance(rsp, wireprototypes.ooberror):
264 rsp = rsp.message
282 rsp = rsp.message
265 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
283 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
266 return []
284 return []
267 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
285 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
268
286
269 def _handlehttperror(e, req, cmd):
287 def _handlehttperror(e, req, cmd):
270 """Called when an ErrorResponse is raised during HTTP request processing."""
288 """Called when an ErrorResponse is raised during HTTP request processing."""
271
289
272 # Clients using Python's httplib are stateful: the HTTP client
290 # Clients using Python's httplib are stateful: the HTTP client
273 # won't process an HTTP response until all request data is
291 # won't process an HTTP response until all request data is
274 # sent to the server. The intent of this code is to ensure
292 # sent to the server. The intent of this code is to ensure
275 # we always read HTTP request data from the client, thus
293 # we always read HTTP request data from the client, thus
276 # ensuring httplib transitions to a state that allows it to read
294 # ensuring httplib transitions to a state that allows it to read
277 # the HTTP response. In other words, it helps prevent deadlocks
295 # the HTTP response. In other words, it helps prevent deadlocks
278 # on clients using httplib.
296 # on clients using httplib.
279
297
280 if (req.env[r'REQUEST_METHOD'] == r'POST' and
298 if (req.env[r'REQUEST_METHOD'] == r'POST' and
281 # But not if Expect: 100-continue is being used.
299 # But not if Expect: 100-continue is being used.
282 (req.env.get('HTTP_EXPECT',
300 (req.env.get('HTTP_EXPECT',
283 '').lower() != '100-continue') or
301 '').lower() != '100-continue') or
284 # Or the non-httplib HTTP library is being advertised by
302 # Or the non-httplib HTTP library is being advertised by
285 # the client.
303 # the client.
286 req.env.get('X-HgHttp2', '')):
304 req.env.get('X-HgHttp2', '')):
287 req.drain()
305 req.drain()
288 else:
306 else:
289 req.headers.append((r'Connection', r'Close'))
307 req.headers.append((r'Connection', r'Close'))
290
308
291 # TODO This response body assumes the failed command was
309 # TODO This response body assumes the failed command was
292 # "unbundle." That assumption is not always valid.
310 # "unbundle." That assumption is not always valid.
293 req.respond(e, HGTYPE, body='0\n%s\n' % pycompat.bytestr(e))
311 req.respond(e, HGTYPE, body='0\n%s\n' % pycompat.bytestr(e))
294
312
295 return ''
313 return ''
296
314
297 def _sshv1respondbytes(fout, value):
315 def _sshv1respondbytes(fout, value):
298 """Send a bytes response for protocol version 1."""
316 """Send a bytes response for protocol version 1."""
299 fout.write('%d\n' % len(value))
317 fout.write('%d\n' % len(value))
300 fout.write(value)
318 fout.write(value)
301 fout.flush()
319 fout.flush()
302
320
303 def _sshv1respondstream(fout, source):
321 def _sshv1respondstream(fout, source):
304 write = fout.write
322 write = fout.write
305 for chunk in source.gen:
323 for chunk in source.gen:
306 write(chunk)
324 write(chunk)
307 fout.flush()
325 fout.flush()
308
326
309 def _sshv1respondooberror(fout, ferr, rsp):
327 def _sshv1respondooberror(fout, ferr, rsp):
310 ferr.write(b'%s\n-\n' % rsp)
328 ferr.write(b'%s\n-\n' % rsp)
311 ferr.flush()
329 ferr.flush()
312 fout.write(b'\n')
330 fout.write(b'\n')
313 fout.flush()
331 fout.flush()
314
332
315 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
333 class sshv1protocolhandler(wireprototypes.baseprotocolhandler):
316 """Handler for requests services via version 1 of SSH protocol."""
334 """Handler for requests services via version 1 of SSH protocol."""
317 def __init__(self, ui, fin, fout):
335 def __init__(self, ui, fin, fout):
318 self._ui = ui
336 self._ui = ui
319 self._fin = fin
337 self._fin = fin
320 self._fout = fout
338 self._fout = fout
321
339
322 @property
340 @property
323 def name(self):
341 def name(self):
324 return wireprototypes.SSHV1
342 return wireprototypes.SSHV1
325
343
326 def getargs(self, args):
344 def getargs(self, args):
327 data = {}
345 data = {}
328 keys = args.split()
346 keys = args.split()
329 for n in xrange(len(keys)):
347 for n in xrange(len(keys)):
330 argline = self._fin.readline()[:-1]
348 argline = self._fin.readline()[:-1]
331 arg, l = argline.split()
349 arg, l = argline.split()
332 if arg not in keys:
350 if arg not in keys:
333 raise error.Abort(_("unexpected parameter %r") % arg)
351 raise error.Abort(_("unexpected parameter %r") % arg)
334 if arg == '*':
352 if arg == '*':
335 star = {}
353 star = {}
336 for k in xrange(int(l)):
354 for k in xrange(int(l)):
337 argline = self._fin.readline()[:-1]
355 argline = self._fin.readline()[:-1]
338 arg, l = argline.split()
356 arg, l = argline.split()
339 val = self._fin.read(int(l))
357 val = self._fin.read(int(l))
340 star[arg] = val
358 star[arg] = val
341 data['*'] = star
359 data['*'] = star
342 else:
360 else:
343 val = self._fin.read(int(l))
361 val = self._fin.read(int(l))
344 data[arg] = val
362 data[arg] = val
345 return [data[k] for k in keys]
363 return [data[k] for k in keys]
346
364
347 def forwardpayload(self, fpout):
365 def forwardpayload(self, fpout):
348 # We initially send an empty response. This tells the client it is
366 # We initially send an empty response. This tells the client it is
349 # OK to start sending data. If a client sees any other response, it
367 # OK to start sending data. If a client sees any other response, it
350 # interprets it as an error.
368 # interprets it as an error.
351 _sshv1respondbytes(self._fout, b'')
369 _sshv1respondbytes(self._fout, b'')
352
370
353 # The file is in the form:
371 # The file is in the form:
354 #
372 #
355 # <chunk size>\n<chunk>
373 # <chunk size>\n<chunk>
356 # ...
374 # ...
357 # 0\n
375 # 0\n
358 count = int(self._fin.readline())
376 count = int(self._fin.readline())
359 while count:
377 while count:
360 fpout.write(self._fin.read(count))
378 fpout.write(self._fin.read(count))
361 count = int(self._fin.readline())
379 count = int(self._fin.readline())
362
380
363 @contextlib.contextmanager
381 @contextlib.contextmanager
364 def mayberedirectstdio(self):
382 def mayberedirectstdio(self):
365 yield None
383 yield None
366
384
367 def client(self):
385 def client(self):
368 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
386 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
369 return 'remote:ssh:' + client
387 return 'remote:ssh:' + client
370
388
389 def addcapabilities(self, repo, caps):
390 return caps
391
371 class sshv2protocolhandler(sshv1protocolhandler):
392 class sshv2protocolhandler(sshv1protocolhandler):
372 """Protocol handler for version 2 of the SSH protocol."""
393 """Protocol handler for version 2 of the SSH protocol."""
373
394
374 @property
395 @property
375 def name(self):
396 def name(self):
376 return wireprototypes.SSHV2
397 return wireprototypes.SSHV2
377
398
378 def _runsshserver(ui, repo, fin, fout, ev):
399 def _runsshserver(ui, repo, fin, fout, ev):
379 # This function operates like a state machine of sorts. The following
400 # This function operates like a state machine of sorts. The following
380 # states are defined:
401 # states are defined:
381 #
402 #
382 # protov1-serving
403 # protov1-serving
383 # Server is in protocol version 1 serving mode. Commands arrive on
404 # Server is in protocol version 1 serving mode. Commands arrive on
384 # new lines. These commands are processed in this state, one command
405 # new lines. These commands are processed in this state, one command
385 # after the other.
406 # after the other.
386 #
407 #
387 # protov2-serving
408 # protov2-serving
388 # Server is in protocol version 2 serving mode.
409 # Server is in protocol version 2 serving mode.
389 #
410 #
390 # upgrade-initial
411 # upgrade-initial
391 # The server is going to process an upgrade request.
412 # The server is going to process an upgrade request.
392 #
413 #
393 # upgrade-v2-filter-legacy-handshake
414 # upgrade-v2-filter-legacy-handshake
394 # The protocol is being upgraded to version 2. The server is expecting
415 # The protocol is being upgraded to version 2. The server is expecting
395 # the legacy handshake from version 1.
416 # the legacy handshake from version 1.
396 #
417 #
397 # upgrade-v2-finish
418 # upgrade-v2-finish
398 # The upgrade to version 2 of the protocol is imminent.
419 # The upgrade to version 2 of the protocol is imminent.
399 #
420 #
400 # shutdown
421 # shutdown
401 # The server is shutting down, possibly in reaction to a client event.
422 # The server is shutting down, possibly in reaction to a client event.
402 #
423 #
403 # And here are their transitions:
424 # And here are their transitions:
404 #
425 #
405 # protov1-serving -> shutdown
426 # protov1-serving -> shutdown
406 # When server receives an empty request or encounters another
427 # When server receives an empty request or encounters another
407 # error.
428 # error.
408 #
429 #
409 # protov1-serving -> upgrade-initial
430 # protov1-serving -> upgrade-initial
410 # An upgrade request line was seen.
431 # An upgrade request line was seen.
411 #
432 #
412 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
433 # upgrade-initial -> upgrade-v2-filter-legacy-handshake
413 # Upgrade to version 2 in progress. Server is expecting to
434 # Upgrade to version 2 in progress. Server is expecting to
414 # process a legacy handshake.
435 # process a legacy handshake.
415 #
436 #
416 # upgrade-v2-filter-legacy-handshake -> shutdown
437 # upgrade-v2-filter-legacy-handshake -> shutdown
417 # Client did not fulfill upgrade handshake requirements.
438 # Client did not fulfill upgrade handshake requirements.
418 #
439 #
419 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
440 # upgrade-v2-filter-legacy-handshake -> upgrade-v2-finish
420 # Client fulfilled version 2 upgrade requirements. Finishing that
441 # Client fulfilled version 2 upgrade requirements. Finishing that
421 # upgrade.
442 # upgrade.
422 #
443 #
423 # upgrade-v2-finish -> protov2-serving
444 # upgrade-v2-finish -> protov2-serving
424 # Protocol upgrade to version 2 complete. Server can now speak protocol
445 # Protocol upgrade to version 2 complete. Server can now speak protocol
425 # version 2.
446 # version 2.
426 #
447 #
427 # protov2-serving -> protov1-serving
448 # protov2-serving -> protov1-serving
428 # Ths happens by default since protocol version 2 is the same as
449 # Ths happens by default since protocol version 2 is the same as
429 # version 1 except for the handshake.
450 # version 1 except for the handshake.
430
451
431 state = 'protov1-serving'
452 state = 'protov1-serving'
432 proto = sshv1protocolhandler(ui, fin, fout)
453 proto = sshv1protocolhandler(ui, fin, fout)
433 protoswitched = False
454 protoswitched = False
434
455
435 while not ev.is_set():
456 while not ev.is_set():
436 if state == 'protov1-serving':
457 if state == 'protov1-serving':
437 # Commands are issued on new lines.
458 # Commands are issued on new lines.
438 request = fin.readline()[:-1]
459 request = fin.readline()[:-1]
439
460
440 # Empty lines signal to terminate the connection.
461 # Empty lines signal to terminate the connection.
441 if not request:
462 if not request:
442 state = 'shutdown'
463 state = 'shutdown'
443 continue
464 continue
444
465
445 # It looks like a protocol upgrade request. Transition state to
466 # It looks like a protocol upgrade request. Transition state to
446 # handle it.
467 # handle it.
447 if request.startswith(b'upgrade '):
468 if request.startswith(b'upgrade '):
448 if protoswitched:
469 if protoswitched:
449 _sshv1respondooberror(fout, ui.ferr,
470 _sshv1respondooberror(fout, ui.ferr,
450 b'cannot upgrade protocols multiple '
471 b'cannot upgrade protocols multiple '
451 b'times')
472 b'times')
452 state = 'shutdown'
473 state = 'shutdown'
453 continue
474 continue
454
475
455 state = 'upgrade-initial'
476 state = 'upgrade-initial'
456 continue
477 continue
457
478
458 available = wireproto.commands.commandavailable(request, proto)
479 available = wireproto.commands.commandavailable(request, proto)
459
480
460 # This command isn't available. Send an empty response and go
481 # This command isn't available. Send an empty response and go
461 # back to waiting for a new command.
482 # back to waiting for a new command.
462 if not available:
483 if not available:
463 _sshv1respondbytes(fout, b'')
484 _sshv1respondbytes(fout, b'')
464 continue
485 continue
465
486
466 rsp = wireproto.dispatch(repo, proto, request)
487 rsp = wireproto.dispatch(repo, proto, request)
467
488
468 if isinstance(rsp, bytes):
489 if isinstance(rsp, bytes):
469 _sshv1respondbytes(fout, rsp)
490 _sshv1respondbytes(fout, rsp)
470 elif isinstance(rsp, wireprototypes.bytesresponse):
491 elif isinstance(rsp, wireprototypes.bytesresponse):
471 _sshv1respondbytes(fout, rsp.data)
492 _sshv1respondbytes(fout, rsp.data)
472 elif isinstance(rsp, wireprototypes.streamres):
493 elif isinstance(rsp, wireprototypes.streamres):
473 _sshv1respondstream(fout, rsp)
494 _sshv1respondstream(fout, rsp)
474 elif isinstance(rsp, wireprototypes.streamreslegacy):
495 elif isinstance(rsp, wireprototypes.streamreslegacy):
475 _sshv1respondstream(fout, rsp)
496 _sshv1respondstream(fout, rsp)
476 elif isinstance(rsp, wireprototypes.pushres):
497 elif isinstance(rsp, wireprototypes.pushres):
477 _sshv1respondbytes(fout, b'')
498 _sshv1respondbytes(fout, b'')
478 _sshv1respondbytes(fout, b'%d' % rsp.res)
499 _sshv1respondbytes(fout, b'%d' % rsp.res)
479 elif isinstance(rsp, wireprototypes.pusherr):
500 elif isinstance(rsp, wireprototypes.pusherr):
480 _sshv1respondbytes(fout, rsp.res)
501 _sshv1respondbytes(fout, rsp.res)
481 elif isinstance(rsp, wireprototypes.ooberror):
502 elif isinstance(rsp, wireprototypes.ooberror):
482 _sshv1respondooberror(fout, ui.ferr, rsp.message)
503 _sshv1respondooberror(fout, ui.ferr, rsp.message)
483 else:
504 else:
484 raise error.ProgrammingError('unhandled response type from '
505 raise error.ProgrammingError('unhandled response type from '
485 'wire protocol command: %s' % rsp)
506 'wire protocol command: %s' % rsp)
486
507
487 # For now, protocol version 2 serving just goes back to version 1.
508 # For now, protocol version 2 serving just goes back to version 1.
488 elif state == 'protov2-serving':
509 elif state == 'protov2-serving':
489 state = 'protov1-serving'
510 state = 'protov1-serving'
490 continue
511 continue
491
512
492 elif state == 'upgrade-initial':
513 elif state == 'upgrade-initial':
493 # We should never transition into this state if we've switched
514 # We should never transition into this state if we've switched
494 # protocols.
515 # protocols.
495 assert not protoswitched
516 assert not protoswitched
496 assert proto.name == wireprototypes.SSHV1
517 assert proto.name == wireprototypes.SSHV1
497
518
498 # Expected: upgrade <token> <capabilities>
519 # Expected: upgrade <token> <capabilities>
499 # If we get something else, the request is malformed. It could be
520 # If we get something else, the request is malformed. It could be
500 # from a future client that has altered the upgrade line content.
521 # from a future client that has altered the upgrade line content.
501 # We treat this as an unknown command.
522 # We treat this as an unknown command.
502 try:
523 try:
503 token, caps = request.split(b' ')[1:]
524 token, caps = request.split(b' ')[1:]
504 except ValueError:
525 except ValueError:
505 _sshv1respondbytes(fout, b'')
526 _sshv1respondbytes(fout, b'')
506 state = 'protov1-serving'
527 state = 'protov1-serving'
507 continue
528 continue
508
529
509 # Send empty response if we don't support upgrading protocols.
530 # Send empty response if we don't support upgrading protocols.
510 if not ui.configbool('experimental', 'sshserver.support-v2'):
531 if not ui.configbool('experimental', 'sshserver.support-v2'):
511 _sshv1respondbytes(fout, b'')
532 _sshv1respondbytes(fout, b'')
512 state = 'protov1-serving'
533 state = 'protov1-serving'
513 continue
534 continue
514
535
515 try:
536 try:
516 caps = urlreq.parseqs(caps)
537 caps = urlreq.parseqs(caps)
517 except ValueError:
538 except ValueError:
518 _sshv1respondbytes(fout, b'')
539 _sshv1respondbytes(fout, b'')
519 state = 'protov1-serving'
540 state = 'protov1-serving'
520 continue
541 continue
521
542
522 # We don't see an upgrade request to protocol version 2. Ignore
543 # We don't see an upgrade request to protocol version 2. Ignore
523 # the upgrade request.
544 # the upgrade request.
524 wantedprotos = caps.get(b'proto', [b''])[0]
545 wantedprotos = caps.get(b'proto', [b''])[0]
525 if SSHV2 not in wantedprotos:
546 if SSHV2 not in wantedprotos:
526 _sshv1respondbytes(fout, b'')
547 _sshv1respondbytes(fout, b'')
527 state = 'protov1-serving'
548 state = 'protov1-serving'
528 continue
549 continue
529
550
530 # It looks like we can honor this upgrade request to protocol 2.
551 # It looks like we can honor this upgrade request to protocol 2.
531 # Filter the rest of the handshake protocol request lines.
552 # Filter the rest of the handshake protocol request lines.
532 state = 'upgrade-v2-filter-legacy-handshake'
553 state = 'upgrade-v2-filter-legacy-handshake'
533 continue
554 continue
534
555
535 elif state == 'upgrade-v2-filter-legacy-handshake':
556 elif state == 'upgrade-v2-filter-legacy-handshake':
536 # Client should have sent legacy handshake after an ``upgrade``
557 # Client should have sent legacy handshake after an ``upgrade``
537 # request. Expected lines:
558 # request. Expected lines:
538 #
559 #
539 # hello
560 # hello
540 # between
561 # between
541 # pairs 81
562 # pairs 81
542 # 0000...-0000...
563 # 0000...-0000...
543
564
544 ok = True
565 ok = True
545 for line in (b'hello', b'between', b'pairs 81'):
566 for line in (b'hello', b'between', b'pairs 81'):
546 request = fin.readline()[:-1]
567 request = fin.readline()[:-1]
547
568
548 if request != line:
569 if request != line:
549 _sshv1respondooberror(fout, ui.ferr,
570 _sshv1respondooberror(fout, ui.ferr,
550 b'malformed handshake protocol: '
571 b'malformed handshake protocol: '
551 b'missing %s' % line)
572 b'missing %s' % line)
552 ok = False
573 ok = False
553 state = 'shutdown'
574 state = 'shutdown'
554 break
575 break
555
576
556 if not ok:
577 if not ok:
557 continue
578 continue
558
579
559 request = fin.read(81)
580 request = fin.read(81)
560 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
581 if request != b'%s-%s' % (b'0' * 40, b'0' * 40):
561 _sshv1respondooberror(fout, ui.ferr,
582 _sshv1respondooberror(fout, ui.ferr,
562 b'malformed handshake protocol: '
583 b'malformed handshake protocol: '
563 b'missing between argument value')
584 b'missing between argument value')
564 state = 'shutdown'
585 state = 'shutdown'
565 continue
586 continue
566
587
567 state = 'upgrade-v2-finish'
588 state = 'upgrade-v2-finish'
568 continue
589 continue
569
590
570 elif state == 'upgrade-v2-finish':
591 elif state == 'upgrade-v2-finish':
571 # Send the upgrade response.
592 # Send the upgrade response.
572 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
593 fout.write(b'upgraded %s %s\n' % (token, SSHV2))
573 servercaps = wireproto.capabilities(repo, proto)
594 servercaps = wireproto.capabilities(repo, proto)
574 rsp = b'capabilities: %s' % servercaps.data
595 rsp = b'capabilities: %s' % servercaps.data
575 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
596 fout.write(b'%d\n%s\n' % (len(rsp), rsp))
576 fout.flush()
597 fout.flush()
577
598
578 proto = sshv2protocolhandler(ui, fin, fout)
599 proto = sshv2protocolhandler(ui, fin, fout)
579 protoswitched = True
600 protoswitched = True
580
601
581 state = 'protov2-serving'
602 state = 'protov2-serving'
582 continue
603 continue
583
604
584 elif state == 'shutdown':
605 elif state == 'shutdown':
585 break
606 break
586
607
587 else:
608 else:
588 raise error.ProgrammingError('unhandled ssh server state: %s' %
609 raise error.ProgrammingError('unhandled ssh server state: %s' %
589 state)
610 state)
590
611
591 class sshserver(object):
612 class sshserver(object):
592 def __init__(self, ui, repo, logfh=None):
613 def __init__(self, ui, repo, logfh=None):
593 self._ui = ui
614 self._ui = ui
594 self._repo = repo
615 self._repo = repo
595 self._fin = ui.fin
616 self._fin = ui.fin
596 self._fout = ui.fout
617 self._fout = ui.fout
597
618
598 # Log write I/O to stdout and stderr if configured.
619 # Log write I/O to stdout and stderr if configured.
599 if logfh:
620 if logfh:
600 self._fout = util.makeloggingfileobject(
621 self._fout = util.makeloggingfileobject(
601 logfh, self._fout, 'o', logdata=True)
622 logfh, self._fout, 'o', logdata=True)
602 ui.ferr = util.makeloggingfileobject(
623 ui.ferr = util.makeloggingfileobject(
603 logfh, ui.ferr, 'e', logdata=True)
624 logfh, ui.ferr, 'e', logdata=True)
604
625
605 hook.redirect(True)
626 hook.redirect(True)
606 ui.fout = repo.ui.fout = ui.ferr
627 ui.fout = repo.ui.fout = ui.ferr
607
628
608 # Prevent insertion/deletion of CRs
629 # Prevent insertion/deletion of CRs
609 util.setbinary(self._fin)
630 util.setbinary(self._fin)
610 util.setbinary(self._fout)
631 util.setbinary(self._fout)
611
632
612 def serve_forever(self):
633 def serve_forever(self):
613 self.serveuntil(threading.Event())
634 self.serveuntil(threading.Event())
614 sys.exit(0)
635 sys.exit(0)
615
636
616 def serveuntil(self, ev):
637 def serveuntil(self, ev):
617 """Serve until a threading.Event is set."""
638 """Serve until a threading.Event is set."""
618 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
639 _runsshserver(self._ui, self._repo, self._fin, self._fout, ev)
@@ -1,139 +1,148 b''
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
2 #
2 #
3 # This software may be used and distributed according to the terms of the
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
4 # GNU General Public License version 2 or any later version.
5
5
6 from __future__ import absolute_import
6 from __future__ import absolute_import
7
7
8 import abc
8 import abc
9
9
10 # Names of the SSH protocol implementations.
10 # Names of the SSH protocol implementations.
11 SSHV1 = 'ssh-v1'
11 SSHV1 = 'ssh-v1'
12 # This is advertised over the wire. Incremental the counter at the end
12 # This is advertised over the wire. Incremental the counter at the end
13 # to reflect BC breakages.
13 # to reflect BC breakages.
14 SSHV2 = 'exp-ssh-v2-0001'
14 SSHV2 = 'exp-ssh-v2-0001'
15
15
16 # All available wire protocol transports.
16 # All available wire protocol transports.
17 TRANSPORTS = {
17 TRANSPORTS = {
18 SSHV1: {
18 SSHV1: {
19 'transport': 'ssh',
19 'transport': 'ssh',
20 'version': 1,
20 'version': 1,
21 },
21 },
22 SSHV2: {
22 SSHV2: {
23 'transport': 'ssh',
23 'transport': 'ssh',
24 'version': 2,
24 'version': 2,
25 },
25 },
26 'http-v1': {
26 'http-v1': {
27 'transport': 'http',
27 'transport': 'http',
28 'version': 1,
28 'version': 1,
29 }
29 }
30 }
30 }
31
31
32 class bytesresponse(object):
32 class bytesresponse(object):
33 """A wire protocol response consisting of raw bytes."""
33 """A wire protocol response consisting of raw bytes."""
34 def __init__(self, data):
34 def __init__(self, data):
35 self.data = data
35 self.data = data
36
36
37 class ooberror(object):
37 class ooberror(object):
38 """wireproto reply: failure of a batch of operation
38 """wireproto reply: failure of a batch of operation
39
39
40 Something failed during a batch call. The error message is stored in
40 Something failed during a batch call. The error message is stored in
41 `self.message`.
41 `self.message`.
42 """
42 """
43 def __init__(self, message):
43 def __init__(self, message):
44 self.message = message
44 self.message = message
45
45
46 class pushres(object):
46 class pushres(object):
47 """wireproto reply: success with simple integer return
47 """wireproto reply: success with simple integer return
48
48
49 The call was successful and returned an integer contained in `self.res`.
49 The call was successful and returned an integer contained in `self.res`.
50 """
50 """
51 def __init__(self, res, output):
51 def __init__(self, res, output):
52 self.res = res
52 self.res = res
53 self.output = output
53 self.output = output
54
54
55 class pusherr(object):
55 class pusherr(object):
56 """wireproto reply: failure
56 """wireproto reply: failure
57
57
58 The call failed. The `self.res` attribute contains the error message.
58 The call failed. The `self.res` attribute contains the error message.
59 """
59 """
60 def __init__(self, res, output):
60 def __init__(self, res, output):
61 self.res = res
61 self.res = res
62 self.output = output
62 self.output = output
63
63
64 class streamres(object):
64 class streamres(object):
65 """wireproto reply: binary stream
65 """wireproto reply: binary stream
66
66
67 The call was successful and the result is a stream.
67 The call was successful and the result is a stream.
68
68
69 Accepts a generator containing chunks of data to be sent to the client.
69 Accepts a generator containing chunks of data to be sent to the client.
70
70
71 ``prefer_uncompressed`` indicates that the data is expected to be
71 ``prefer_uncompressed`` indicates that the data is expected to be
72 uncompressable and that the stream should therefore use the ``none``
72 uncompressable and that the stream should therefore use the ``none``
73 engine.
73 engine.
74 """
74 """
75 def __init__(self, gen=None, prefer_uncompressed=False):
75 def __init__(self, gen=None, prefer_uncompressed=False):
76 self.gen = gen
76 self.gen = gen
77 self.prefer_uncompressed = prefer_uncompressed
77 self.prefer_uncompressed = prefer_uncompressed
78
78
79 class streamreslegacy(object):
79 class streamreslegacy(object):
80 """wireproto reply: uncompressed binary stream
80 """wireproto reply: uncompressed binary stream
81
81
82 The call was successful and the result is a stream.
82 The call was successful and the result is a stream.
83
83
84 Accepts a generator containing chunks of data to be sent to the client.
84 Accepts a generator containing chunks of data to be sent to the client.
85
85
86 Like ``streamres``, but sends an uncompressed data for "version 1" clients
86 Like ``streamres``, but sends an uncompressed data for "version 1" clients
87 using the application/mercurial-0.1 media type.
87 using the application/mercurial-0.1 media type.
88 """
88 """
89 def __init__(self, gen=None):
89 def __init__(self, gen=None):
90 self.gen = gen
90 self.gen = gen
91
91
92 class baseprotocolhandler(object):
92 class baseprotocolhandler(object):
93 """Abstract base class for wire protocol handlers.
93 """Abstract base class for wire protocol handlers.
94
94
95 A wire protocol handler serves as an interface between protocol command
95 A wire protocol handler serves as an interface between protocol command
96 handlers and the wire protocol transport layer. Protocol handlers provide
96 handlers and the wire protocol transport layer. Protocol handlers provide
97 methods to read command arguments, redirect stdio for the duration of
97 methods to read command arguments, redirect stdio for the duration of
98 the request, handle response types, etc.
98 the request, handle response types, etc.
99 """
99 """
100
100
101 __metaclass__ = abc.ABCMeta
101 __metaclass__ = abc.ABCMeta
102
102
103 @abc.abstractproperty
103 @abc.abstractproperty
104 def name(self):
104 def name(self):
105 """The name of the protocol implementation.
105 """The name of the protocol implementation.
106
106
107 Used for uniquely identifying the transport type.
107 Used for uniquely identifying the transport type.
108 """
108 """
109
109
110 @abc.abstractmethod
110 @abc.abstractmethod
111 def getargs(self, args):
111 def getargs(self, args):
112 """return the value for arguments in <args>
112 """return the value for arguments in <args>
113
113
114 returns a list of values (same order as <args>)"""
114 returns a list of values (same order as <args>)"""
115
115
116 @abc.abstractmethod
116 @abc.abstractmethod
117 def forwardpayload(self, fp):
117 def forwardpayload(self, fp):
118 """Read the raw payload and forward to a file.
118 """Read the raw payload and forward to a file.
119
119
120 The payload is read in full before the function returns.
120 The payload is read in full before the function returns.
121 """
121 """
122
122
123 @abc.abstractmethod
123 @abc.abstractmethod
124 def mayberedirectstdio(self):
124 def mayberedirectstdio(self):
125 """Context manager to possibly redirect stdio.
125 """Context manager to possibly redirect stdio.
126
126
127 The context manager yields a file-object like object that receives
127 The context manager yields a file-object like object that receives
128 stdout and stderr output when the context manager is active. Or it
128 stdout and stderr output when the context manager is active. Or it
129 yields ``None`` if no I/O redirection occurs.
129 yields ``None`` if no I/O redirection occurs.
130
130
131 The intent of this context manager is to capture stdio output
131 The intent of this context manager is to capture stdio output
132 so it may be sent in the response. Some transports support streaming
132 so it may be sent in the response. Some transports support streaming
133 stdio to the client in real time. For these transports, stdio output
133 stdio to the client in real time. For these transports, stdio output
134 won't be captured.
134 won't be captured.
135 """
135 """
136
136
137 @abc.abstractmethod
137 @abc.abstractmethod
138 def client(self):
138 def client(self):
139 """Returns a string representation of this client (as bytes)."""
139 """Returns a string representation of this client (as bytes)."""
140
141 @abc.abstractmethod
142 def addcapabilities(self, repo, caps):
143 """Adds advertised capabilities specific to this protocol.
144
145 Receives the list of capabilities collected so far.
146
147 Returns a list of capabilities. The passed in argument can be returned.
148 """
General Comments 0
You need to be logged in to leave comments. Login now