##// END OF EJS Templates
wireprototypes: move wire protocol response types to new module...
Gregory Szorc -
r36090:cd6ab329 default
parent child Browse files
Show More
@@ -0,0 +1,61 b''
1 # Copyright 2018 Gregory Szorc <gregory.szorc@gmail.com>
2 #
3 # This software may be used and distributed according to the terms of the
4 # GNU General Public License version 2 or any later version.
5
6 from __future__ import absolute_import
7
8 class ooberror(object):
9 """wireproto reply: failure of a batch of operation
10
11 Something failed during a batch call. The error message is stored in
12 `self.message`.
13 """
14 def __init__(self, message):
15 self.message = message
16
17 class pushres(object):
18 """wireproto reply: success with simple integer return
19
20 The call was successful and returned an integer contained in `self.res`.
21 """
22 def __init__(self, res, output):
23 self.res = res
24 self.output = output
25
26 class pusherr(object):
27 """wireproto reply: failure
28
29 The call failed. The `self.res` attribute contains the error message.
30 """
31 def __init__(self, res, output):
32 self.res = res
33 self.output = output
34
35 class streamres(object):
36 """wireproto reply: binary stream
37
38 The call was successful and the result is a stream.
39
40 Accepts a generator containing chunks of data to be sent to the client.
41
42 ``prefer_uncompressed`` indicates that the data is expected to be
43 uncompressable and that the stream should therefore use the ``none``
44 engine.
45 """
46 def __init__(self, gen=None, prefer_uncompressed=False):
47 self.gen = gen
48 self.prefer_uncompressed = prefer_uncompressed
49
50 class streamreslegacy(object):
51 """wireproto reply: uncompressed binary stream
52
53 The call was successful and the result is a stream.
54
55 Accepts a generator containing chunks of data to be sent to the client.
56
57 Like ``streamres``, but sends an uncompressed data for "version 1" clients
58 using the application/mercurial-0.1 media type.
59 """
60 def __init__(self, gen=None):
61 self.gen = gen
@@ -1,1096 +1,1049 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 wireprototypes,
34 )
35 )
35
36
36 urlerr = util.urlerr
37 urlerr = util.urlerr
37 urlreq = util.urlreq
38 urlreq = util.urlreq
38
39
40 ooberror = wireprototypes.ooberror
41 pushres = wireprototypes.pushres
42 pusherr = wireprototypes.pusherr
43 streamres = wireprototypes.streamres
44 streamres_legacy = wireprototypes.streamreslegacy
45
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
46 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
47 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 'IncompatibleClient')
48 'IncompatibleClient')
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
49 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43
50
44 class remoteiterbatcher(peer.iterbatcher):
51 class remoteiterbatcher(peer.iterbatcher):
45 def __init__(self, remote):
52 def __init__(self, remote):
46 super(remoteiterbatcher, self).__init__()
53 super(remoteiterbatcher, self).__init__()
47 self._remote = remote
54 self._remote = remote
48
55
49 def __getattr__(self, name):
56 def __getattr__(self, name):
50 # Validate this method is batchable, since submit() only supports
57 # Validate this method is batchable, since submit() only supports
51 # batchable methods.
58 # batchable methods.
52 fn = getattr(self._remote, name)
59 fn = getattr(self._remote, name)
53 if not getattr(fn, 'batchable', None):
60 if not getattr(fn, 'batchable', None):
54 raise error.ProgrammingError('Attempted to batch a non-batchable '
61 raise error.ProgrammingError('Attempted to batch a non-batchable '
55 'call to %r' % name)
62 'call to %r' % name)
56
63
57 return super(remoteiterbatcher, self).__getattr__(name)
64 return super(remoteiterbatcher, self).__getattr__(name)
58
65
59 def submit(self):
66 def submit(self):
60 """Break the batch request into many patch calls and pipeline them.
67 """Break the batch request into many patch calls and pipeline them.
61
68
62 This is mostly valuable over http where request sizes can be
69 This is mostly valuable over http where request sizes can be
63 limited, but can be used in other places as well.
70 limited, but can be used in other places as well.
64 """
71 """
65 # 2-tuple of (command, arguments) that represents what will be
72 # 2-tuple of (command, arguments) that represents what will be
66 # sent over the wire.
73 # sent over the wire.
67 requests = []
74 requests = []
68
75
69 # 4-tuple of (command, final future, @batchable generator, remote
76 # 4-tuple of (command, final future, @batchable generator, remote
70 # future).
77 # future).
71 results = []
78 results = []
72
79
73 for command, args, opts, finalfuture in self.calls:
80 for command, args, opts, finalfuture in self.calls:
74 mtd = getattr(self._remote, command)
81 mtd = getattr(self._remote, command)
75 batchable = mtd.batchable(mtd.__self__, *args, **opts)
82 batchable = mtd.batchable(mtd.__self__, *args, **opts)
76
83
77 commandargs, fremote = next(batchable)
84 commandargs, fremote = next(batchable)
78 assert fremote
85 assert fremote
79 requests.append((command, commandargs))
86 requests.append((command, commandargs))
80 results.append((command, finalfuture, batchable, fremote))
87 results.append((command, finalfuture, batchable, fremote))
81
88
82 if requests:
89 if requests:
83 self._resultiter = self._remote._submitbatch(requests)
90 self._resultiter = self._remote._submitbatch(requests)
84
91
85 self._results = results
92 self._results = results
86
93
87 def results(self):
94 def results(self):
88 for command, finalfuture, batchable, remotefuture in self._results:
95 for command, finalfuture, batchable, remotefuture in self._results:
89 # Get the raw result, set it in the remote future, feed it
96 # Get the raw result, set it in the remote future, feed it
90 # back into the @batchable generator so it can be decoded, and
97 # back into the @batchable generator so it can be decoded, and
91 # set the result on the final future to this value.
98 # set the result on the final future to this value.
92 remoteresult = next(self._resultiter)
99 remoteresult = next(self._resultiter)
93 remotefuture.set(remoteresult)
100 remotefuture.set(remoteresult)
94 finalfuture.set(next(batchable))
101 finalfuture.set(next(batchable))
95
102
96 # Verify our @batchable generators only emit 2 values.
103 # Verify our @batchable generators only emit 2 values.
97 try:
104 try:
98 next(batchable)
105 next(batchable)
99 except StopIteration:
106 except StopIteration:
100 pass
107 pass
101 else:
108 else:
102 raise error.ProgrammingError('%s @batchable generator emitted '
109 raise error.ProgrammingError('%s @batchable generator emitted '
103 'unexpected value count' % command)
110 'unexpected value count' % command)
104
111
105 yield finalfuture.value
112 yield finalfuture.value
106
113
107 # Forward a couple of names from peer to make wireproto interactions
114 # Forward a couple of names from peer to make wireproto interactions
108 # slightly more sensible.
115 # slightly more sensible.
109 batchable = peer.batchable
116 batchable = peer.batchable
110 future = peer.future
117 future = peer.future
111
118
112 # list of nodes encoding / decoding
119 # list of nodes encoding / decoding
113
120
114 def decodelist(l, sep=' '):
121 def decodelist(l, sep=' '):
115 if l:
122 if l:
116 return [bin(v) for v in l.split(sep)]
123 return [bin(v) for v in l.split(sep)]
117 return []
124 return []
118
125
119 def encodelist(l, sep=' '):
126 def encodelist(l, sep=' '):
120 try:
127 try:
121 return sep.join(map(hex, l))
128 return sep.join(map(hex, l))
122 except TypeError:
129 except TypeError:
123 raise
130 raise
124
131
125 # batched call argument encoding
132 # batched call argument encoding
126
133
127 def escapearg(plain):
134 def escapearg(plain):
128 return (plain
135 return (plain
129 .replace(':', ':c')
136 .replace(':', ':c')
130 .replace(',', ':o')
137 .replace(',', ':o')
131 .replace(';', ':s')
138 .replace(';', ':s')
132 .replace('=', ':e'))
139 .replace('=', ':e'))
133
140
134 def unescapearg(escaped):
141 def unescapearg(escaped):
135 return (escaped
142 return (escaped
136 .replace(':e', '=')
143 .replace(':e', '=')
137 .replace(':s', ';')
144 .replace(':s', ';')
138 .replace(':o', ',')
145 .replace(':o', ',')
139 .replace(':c', ':'))
146 .replace(':c', ':'))
140
147
141 def encodebatchcmds(req):
148 def encodebatchcmds(req):
142 """Return a ``cmds`` argument value for the ``batch`` command."""
149 """Return a ``cmds`` argument value for the ``batch`` command."""
143 cmds = []
150 cmds = []
144 for op, argsdict in req:
151 for op, argsdict in req:
145 # Old servers didn't properly unescape argument names. So prevent
152 # Old servers didn't properly unescape argument names. So prevent
146 # the sending of argument names that may not be decoded properly by
153 # the sending of argument names that may not be decoded properly by
147 # servers.
154 # servers.
148 assert all(escapearg(k) == k for k in argsdict)
155 assert all(escapearg(k) == k for k in argsdict)
149
156
150 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
157 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
151 for k, v in argsdict.iteritems())
158 for k, v in argsdict.iteritems())
152 cmds.append('%s %s' % (op, args))
159 cmds.append('%s %s' % (op, args))
153
160
154 return ';'.join(cmds)
161 return ';'.join(cmds)
155
162
156 # mapping of options accepted by getbundle and their types
163 # mapping of options accepted by getbundle and their types
157 #
164 #
158 # Meant to be extended by extensions. It is extensions responsibility to ensure
165 # Meant to be extended by extensions. It is extensions responsibility to ensure
159 # such options are properly processed in exchange.getbundle.
166 # such options are properly processed in exchange.getbundle.
160 #
167 #
161 # supported types are:
168 # supported types are:
162 #
169 #
163 # :nodes: list of binary nodes
170 # :nodes: list of binary nodes
164 # :csv: list of comma-separated values
171 # :csv: list of comma-separated values
165 # :scsv: list of comma-separated values return as set
172 # :scsv: list of comma-separated values return as set
166 # :plain: string with no transformation needed.
173 # :plain: string with no transformation needed.
167 gboptsmap = {'heads': 'nodes',
174 gboptsmap = {'heads': 'nodes',
168 'bookmarks': 'boolean',
175 'bookmarks': 'boolean',
169 'common': 'nodes',
176 'common': 'nodes',
170 'obsmarkers': 'boolean',
177 'obsmarkers': 'boolean',
171 'phases': 'boolean',
178 'phases': 'boolean',
172 'bundlecaps': 'scsv',
179 'bundlecaps': 'scsv',
173 'listkeys': 'csv',
180 'listkeys': 'csv',
174 'cg': 'boolean',
181 'cg': 'boolean',
175 'cbattempted': 'boolean',
182 'cbattempted': 'boolean',
176 'stream': 'boolean',
183 'stream': 'boolean',
177 }
184 }
178
185
179 # client side
186 # client side
180
187
181 class wirepeer(repository.legacypeer):
188 class wirepeer(repository.legacypeer):
182 """Client-side interface for communicating with a peer repository.
189 """Client-side interface for communicating with a peer repository.
183
190
184 Methods commonly call wire protocol commands of the same name.
191 Methods commonly call wire protocol commands of the same name.
185
192
186 See also httppeer.py and sshpeer.py for protocol-specific
193 See also httppeer.py and sshpeer.py for protocol-specific
187 implementations of this interface.
194 implementations of this interface.
188 """
195 """
189 # Begin of basewirepeer interface.
196 # Begin of basewirepeer interface.
190
197
191 def iterbatch(self):
198 def iterbatch(self):
192 return remoteiterbatcher(self)
199 return remoteiterbatcher(self)
193
200
194 @batchable
201 @batchable
195 def lookup(self, key):
202 def lookup(self, key):
196 self.requirecap('lookup', _('look up remote revision'))
203 self.requirecap('lookup', _('look up remote revision'))
197 f = future()
204 f = future()
198 yield {'key': encoding.fromlocal(key)}, f
205 yield {'key': encoding.fromlocal(key)}, f
199 d = f.value
206 d = f.value
200 success, data = d[:-1].split(" ", 1)
207 success, data = d[:-1].split(" ", 1)
201 if int(success):
208 if int(success):
202 yield bin(data)
209 yield bin(data)
203 else:
210 else:
204 self._abort(error.RepoError(data))
211 self._abort(error.RepoError(data))
205
212
206 @batchable
213 @batchable
207 def heads(self):
214 def heads(self):
208 f = future()
215 f = future()
209 yield {}, f
216 yield {}, f
210 d = f.value
217 d = f.value
211 try:
218 try:
212 yield decodelist(d[:-1])
219 yield decodelist(d[:-1])
213 except ValueError:
220 except ValueError:
214 self._abort(error.ResponseError(_("unexpected response:"), d))
221 self._abort(error.ResponseError(_("unexpected response:"), d))
215
222
216 @batchable
223 @batchable
217 def known(self, nodes):
224 def known(self, nodes):
218 f = future()
225 f = future()
219 yield {'nodes': encodelist(nodes)}, f
226 yield {'nodes': encodelist(nodes)}, f
220 d = f.value
227 d = f.value
221 try:
228 try:
222 yield [bool(int(b)) for b in d]
229 yield [bool(int(b)) for b in d]
223 except ValueError:
230 except ValueError:
224 self._abort(error.ResponseError(_("unexpected response:"), d))
231 self._abort(error.ResponseError(_("unexpected response:"), d))
225
232
226 @batchable
233 @batchable
227 def branchmap(self):
234 def branchmap(self):
228 f = future()
235 f = future()
229 yield {}, f
236 yield {}, f
230 d = f.value
237 d = f.value
231 try:
238 try:
232 branchmap = {}
239 branchmap = {}
233 for branchpart in d.splitlines():
240 for branchpart in d.splitlines():
234 branchname, branchheads = branchpart.split(' ', 1)
241 branchname, branchheads = branchpart.split(' ', 1)
235 branchname = encoding.tolocal(urlreq.unquote(branchname))
242 branchname = encoding.tolocal(urlreq.unquote(branchname))
236 branchheads = decodelist(branchheads)
243 branchheads = decodelist(branchheads)
237 branchmap[branchname] = branchheads
244 branchmap[branchname] = branchheads
238 yield branchmap
245 yield branchmap
239 except TypeError:
246 except TypeError:
240 self._abort(error.ResponseError(_("unexpected response:"), d))
247 self._abort(error.ResponseError(_("unexpected response:"), d))
241
248
242 @batchable
249 @batchable
243 def listkeys(self, namespace):
250 def listkeys(self, namespace):
244 if not self.capable('pushkey'):
251 if not self.capable('pushkey'):
245 yield {}, None
252 yield {}, None
246 f = future()
253 f = future()
247 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
254 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
248 yield {'namespace': encoding.fromlocal(namespace)}, f
255 yield {'namespace': encoding.fromlocal(namespace)}, f
249 d = f.value
256 d = f.value
250 self.ui.debug('received listkey for "%s": %i bytes\n'
257 self.ui.debug('received listkey for "%s": %i bytes\n'
251 % (namespace, len(d)))
258 % (namespace, len(d)))
252 yield pushkeymod.decodekeys(d)
259 yield pushkeymod.decodekeys(d)
253
260
254 @batchable
261 @batchable
255 def pushkey(self, namespace, key, old, new):
262 def pushkey(self, namespace, key, old, new):
256 if not self.capable('pushkey'):
263 if not self.capable('pushkey'):
257 yield False, None
264 yield False, None
258 f = future()
265 f = future()
259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
266 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
260 yield {'namespace': encoding.fromlocal(namespace),
267 yield {'namespace': encoding.fromlocal(namespace),
261 'key': encoding.fromlocal(key),
268 'key': encoding.fromlocal(key),
262 'old': encoding.fromlocal(old),
269 'old': encoding.fromlocal(old),
263 'new': encoding.fromlocal(new)}, f
270 'new': encoding.fromlocal(new)}, f
264 d = f.value
271 d = f.value
265 d, output = d.split('\n', 1)
272 d, output = d.split('\n', 1)
266 try:
273 try:
267 d = bool(int(d))
274 d = bool(int(d))
268 except ValueError:
275 except ValueError:
269 raise error.ResponseError(
276 raise error.ResponseError(
270 _('push failed (unexpected response):'), d)
277 _('push failed (unexpected response):'), d)
271 for l in output.splitlines(True):
278 for l in output.splitlines(True):
272 self.ui.status(_('remote: '), l)
279 self.ui.status(_('remote: '), l)
273 yield d
280 yield d
274
281
275 def stream_out(self):
282 def stream_out(self):
276 return self._callstream('stream_out')
283 return self._callstream('stream_out')
277
284
278 def getbundle(self, source, **kwargs):
285 def getbundle(self, source, **kwargs):
279 kwargs = pycompat.byteskwargs(kwargs)
286 kwargs = pycompat.byteskwargs(kwargs)
280 self.requirecap('getbundle', _('look up remote changes'))
287 self.requirecap('getbundle', _('look up remote changes'))
281 opts = {}
288 opts = {}
282 bundlecaps = kwargs.get('bundlecaps')
289 bundlecaps = kwargs.get('bundlecaps')
283 if bundlecaps is not None:
290 if bundlecaps is not None:
284 kwargs['bundlecaps'] = sorted(bundlecaps)
291 kwargs['bundlecaps'] = sorted(bundlecaps)
285 else:
292 else:
286 bundlecaps = () # kwargs could have it to None
293 bundlecaps = () # kwargs could have it to None
287 for key, value in kwargs.iteritems():
294 for key, value in kwargs.iteritems():
288 if value is None:
295 if value is None:
289 continue
296 continue
290 keytype = gboptsmap.get(key)
297 keytype = gboptsmap.get(key)
291 if keytype is None:
298 if keytype is None:
292 raise error.ProgrammingError(
299 raise error.ProgrammingError(
293 'Unexpectedly None keytype for key %s' % key)
300 'Unexpectedly None keytype for key %s' % key)
294 elif keytype == 'nodes':
301 elif keytype == 'nodes':
295 value = encodelist(value)
302 value = encodelist(value)
296 elif keytype in ('csv', 'scsv'):
303 elif keytype in ('csv', 'scsv'):
297 value = ','.join(value)
304 value = ','.join(value)
298 elif keytype == 'boolean':
305 elif keytype == 'boolean':
299 value = '%i' % bool(value)
306 value = '%i' % bool(value)
300 elif keytype != 'plain':
307 elif keytype != 'plain':
301 raise KeyError('unknown getbundle option type %s'
308 raise KeyError('unknown getbundle option type %s'
302 % keytype)
309 % keytype)
303 opts[key] = value
310 opts[key] = value
304 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
311 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
305 if any((cap.startswith('HG2') for cap in bundlecaps)):
312 if any((cap.startswith('HG2') for cap in bundlecaps)):
306 return bundle2.getunbundler(self.ui, f)
313 return bundle2.getunbundler(self.ui, f)
307 else:
314 else:
308 return changegroupmod.cg1unpacker(f, 'UN')
315 return changegroupmod.cg1unpacker(f, 'UN')
309
316
310 def unbundle(self, cg, heads, url):
317 def unbundle(self, cg, heads, url):
311 '''Send cg (a readable file-like object representing the
318 '''Send cg (a readable file-like object representing the
312 changegroup to push, typically a chunkbuffer object) to the
319 changegroup to push, typically a chunkbuffer object) to the
313 remote server as a bundle.
320 remote server as a bundle.
314
321
315 When pushing a bundle10 stream, return an integer indicating the
322 When pushing a bundle10 stream, return an integer indicating the
316 result of the push (see changegroup.apply()).
323 result of the push (see changegroup.apply()).
317
324
318 When pushing a bundle20 stream, return a bundle20 stream.
325 When pushing a bundle20 stream, return a bundle20 stream.
319
326
320 `url` is the url the client thinks it's pushing to, which is
327 `url` is the url the client thinks it's pushing to, which is
321 visible to hooks.
328 visible to hooks.
322 '''
329 '''
323
330
324 if heads != ['force'] and self.capable('unbundlehash'):
331 if heads != ['force'] and self.capable('unbundlehash'):
325 heads = encodelist(['hashed',
332 heads = encodelist(['hashed',
326 hashlib.sha1(''.join(sorted(heads))).digest()])
333 hashlib.sha1(''.join(sorted(heads))).digest()])
327 else:
334 else:
328 heads = encodelist(heads)
335 heads = encodelist(heads)
329
336
330 if util.safehasattr(cg, 'deltaheader'):
337 if util.safehasattr(cg, 'deltaheader'):
331 # this a bundle10, do the old style call sequence
338 # this a bundle10, do the old style call sequence
332 ret, output = self._callpush("unbundle", cg, heads=heads)
339 ret, output = self._callpush("unbundle", cg, heads=heads)
333 if ret == "":
340 if ret == "":
334 raise error.ResponseError(
341 raise error.ResponseError(
335 _('push failed:'), output)
342 _('push failed:'), output)
336 try:
343 try:
337 ret = int(ret)
344 ret = int(ret)
338 except ValueError:
345 except ValueError:
339 raise error.ResponseError(
346 raise error.ResponseError(
340 _('push failed (unexpected response):'), ret)
347 _('push failed (unexpected response):'), ret)
341
348
342 for l in output.splitlines(True):
349 for l in output.splitlines(True):
343 self.ui.status(_('remote: '), l)
350 self.ui.status(_('remote: '), l)
344 else:
351 else:
345 # bundle2 push. Send a stream, fetch a stream.
352 # bundle2 push. Send a stream, fetch a stream.
346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
353 stream = self._calltwowaystream('unbundle', cg, heads=heads)
347 ret = bundle2.getunbundler(self.ui, stream)
354 ret = bundle2.getunbundler(self.ui, stream)
348 return ret
355 return ret
349
356
350 # End of basewirepeer interface.
357 # End of basewirepeer interface.
351
358
352 # Begin of baselegacywirepeer interface.
359 # Begin of baselegacywirepeer interface.
353
360
354 def branches(self, nodes):
361 def branches(self, nodes):
355 n = encodelist(nodes)
362 n = encodelist(nodes)
356 d = self._call("branches", nodes=n)
363 d = self._call("branches", nodes=n)
357 try:
364 try:
358 br = [tuple(decodelist(b)) for b in d.splitlines()]
365 br = [tuple(decodelist(b)) for b in d.splitlines()]
359 return br
366 return br
360 except ValueError:
367 except ValueError:
361 self._abort(error.ResponseError(_("unexpected response:"), d))
368 self._abort(error.ResponseError(_("unexpected response:"), d))
362
369
363 def between(self, pairs):
370 def between(self, pairs):
364 batch = 8 # avoid giant requests
371 batch = 8 # avoid giant requests
365 r = []
372 r = []
366 for i in xrange(0, len(pairs), batch):
373 for i in xrange(0, len(pairs), batch):
367 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
374 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
368 d = self._call("between", pairs=n)
375 d = self._call("between", pairs=n)
369 try:
376 try:
370 r.extend(l and decodelist(l) or [] for l in d.splitlines())
377 r.extend(l and decodelist(l) or [] for l in d.splitlines())
371 except ValueError:
378 except ValueError:
372 self._abort(error.ResponseError(_("unexpected response:"), d))
379 self._abort(error.ResponseError(_("unexpected response:"), d))
373 return r
380 return r
374
381
375 def changegroup(self, nodes, kind):
382 def changegroup(self, nodes, kind):
376 n = encodelist(nodes)
383 n = encodelist(nodes)
377 f = self._callcompressable("changegroup", roots=n)
384 f = self._callcompressable("changegroup", roots=n)
378 return changegroupmod.cg1unpacker(f, 'UN')
385 return changegroupmod.cg1unpacker(f, 'UN')
379
386
380 def changegroupsubset(self, bases, heads, kind):
387 def changegroupsubset(self, bases, heads, kind):
381 self.requirecap('changegroupsubset', _('look up remote changes'))
388 self.requirecap('changegroupsubset', _('look up remote changes'))
382 bases = encodelist(bases)
389 bases = encodelist(bases)
383 heads = encodelist(heads)
390 heads = encodelist(heads)
384 f = self._callcompressable("changegroupsubset",
391 f = self._callcompressable("changegroupsubset",
385 bases=bases, heads=heads)
392 bases=bases, heads=heads)
386 return changegroupmod.cg1unpacker(f, 'UN')
393 return changegroupmod.cg1unpacker(f, 'UN')
387
394
388 # End of baselegacywirepeer interface.
395 # End of baselegacywirepeer interface.
389
396
390 def _submitbatch(self, req):
397 def _submitbatch(self, req):
391 """run batch request <req> on the server
398 """run batch request <req> on the server
392
399
393 Returns an iterator of the raw responses from the server.
400 Returns an iterator of the raw responses from the server.
394 """
401 """
395 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
402 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
396 chunk = rsp.read(1024)
403 chunk = rsp.read(1024)
397 work = [chunk]
404 work = [chunk]
398 while chunk:
405 while chunk:
399 while ';' not in chunk and chunk:
406 while ';' not in chunk and chunk:
400 chunk = rsp.read(1024)
407 chunk = rsp.read(1024)
401 work.append(chunk)
408 work.append(chunk)
402 merged = ''.join(work)
409 merged = ''.join(work)
403 while ';' in merged:
410 while ';' in merged:
404 one, merged = merged.split(';', 1)
411 one, merged = merged.split(';', 1)
405 yield unescapearg(one)
412 yield unescapearg(one)
406 chunk = rsp.read(1024)
413 chunk = rsp.read(1024)
407 work = [merged, chunk]
414 work = [merged, chunk]
408 yield unescapearg(''.join(work))
415 yield unescapearg(''.join(work))
409
416
410 def _submitone(self, op, args):
417 def _submitone(self, op, args):
411 return self._call(op, **pycompat.strkwargs(args))
418 return self._call(op, **pycompat.strkwargs(args))
412
419
413 def debugwireargs(self, one, two, three=None, four=None, five=None):
420 def debugwireargs(self, one, two, three=None, four=None, five=None):
414 # don't pass optional arguments left at their default value
421 # don't pass optional arguments left at their default value
415 opts = {}
422 opts = {}
416 if three is not None:
423 if three is not None:
417 opts[r'three'] = three
424 opts[r'three'] = three
418 if four is not None:
425 if four is not None:
419 opts[r'four'] = four
426 opts[r'four'] = four
420 return self._call('debugwireargs', one=one, two=two, **opts)
427 return self._call('debugwireargs', one=one, two=two, **opts)
421
428
422 def _call(self, cmd, **args):
429 def _call(self, cmd, **args):
423 """execute <cmd> on the server
430 """execute <cmd> on the server
424
431
425 The command is expected to return a simple string.
432 The command is expected to return a simple string.
426
433
427 returns the server reply as a string."""
434 returns the server reply as a string."""
428 raise NotImplementedError()
435 raise NotImplementedError()
429
436
430 def _callstream(self, cmd, **args):
437 def _callstream(self, cmd, **args):
431 """execute <cmd> on the server
438 """execute <cmd> on the server
432
439
433 The command is expected to return a stream. Note that if the
440 The command is expected to return a stream. Note that if the
434 command doesn't return a stream, _callstream behaves
441 command doesn't return a stream, _callstream behaves
435 differently for ssh and http peers.
442 differently for ssh and http peers.
436
443
437 returns the server reply as a file like object.
444 returns the server reply as a file like object.
438 """
445 """
439 raise NotImplementedError()
446 raise NotImplementedError()
440
447
441 def _callcompressable(self, cmd, **args):
448 def _callcompressable(self, cmd, **args):
442 """execute <cmd> on the server
449 """execute <cmd> on the server
443
450
444 The command is expected to return a stream.
451 The command is expected to return a stream.
445
452
446 The stream may have been compressed in some implementations. This
453 The stream may have been compressed in some implementations. This
447 function takes care of the decompression. This is the only difference
454 function takes care of the decompression. This is the only difference
448 with _callstream.
455 with _callstream.
449
456
450 returns the server reply as a file like object.
457 returns the server reply as a file like object.
451 """
458 """
452 raise NotImplementedError()
459 raise NotImplementedError()
453
460
454 def _callpush(self, cmd, fp, **args):
461 def _callpush(self, cmd, fp, **args):
455 """execute a <cmd> on server
462 """execute a <cmd> on server
456
463
457 The command is expected to be related to a push. Push has a special
464 The command is expected to be related to a push. Push has a special
458 return method.
465 return method.
459
466
460 returns the server reply as a (ret, output) tuple. ret is either
467 returns the server reply as a (ret, output) tuple. ret is either
461 empty (error) or a stringified int.
468 empty (error) or a stringified int.
462 """
469 """
463 raise NotImplementedError()
470 raise NotImplementedError()
464
471
465 def _calltwowaystream(self, cmd, fp, **args):
472 def _calltwowaystream(self, cmd, fp, **args):
466 """execute <cmd> on server
473 """execute <cmd> on server
467
474
468 The command will send a stream to the server and get a stream in reply.
475 The command will send a stream to the server and get a stream in reply.
469 """
476 """
470 raise NotImplementedError()
477 raise NotImplementedError()
471
478
472 def _abort(self, exception):
479 def _abort(self, exception):
473 """clearly abort the wire protocol connection and raise the exception
480 """clearly abort the wire protocol connection and raise the exception
474 """
481 """
475 raise NotImplementedError()
482 raise NotImplementedError()
476
483
477 # server side
484 # server side
478
485
479 # wire protocol command can either return a string or one of these classes.
486 # wire protocol command can either return a string or one of these classes.
480 class streamres(object):
481 """wireproto reply: binary stream
482
483 The call was successful and the result is a stream.
484
485 Accepts a generator containing chunks of data to be sent to the client.
486
487 ``prefer_uncompressed`` indicates that the data is expected to be
488 uncompressable and that the stream should therefore use the ``none``
489 engine.
490 """
491 def __init__(self, gen=None, prefer_uncompressed=False):
492 self.gen = gen
493 self.prefer_uncompressed = prefer_uncompressed
494
495 class streamres_legacy(object):
496 """wireproto reply: uncompressed binary stream
497
498 The call was successful and the result is a stream.
499
500 Accepts a generator containing chunks of data to be sent to the client.
501
502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
503 using the application/mercurial-0.1 media type.
504 """
505 def __init__(self, gen=None):
506 self.gen = gen
507
508 class pushres(object):
509 """wireproto reply: success with simple integer return
510
511 The call was successful and returned an integer contained in `self.res`.
512 """
513 def __init__(self, res, output):
514 self.res = res
515 self.output = output
516
517 class pusherr(object):
518 """wireproto reply: failure
519
520 The call failed. The `self.res` attribute contains the error message.
521 """
522 def __init__(self, res, output):
523 self.res = res
524 self.output = output
525
526 class ooberror(object):
527 """wireproto reply: failure of a batch of operation
528
529 Something failed during a batch call. The error message is stored in
530 `self.message`.
531 """
532 def __init__(self, message):
533 self.message = message
534
487
535 def getdispatchrepo(repo, proto, command):
488 def getdispatchrepo(repo, proto, command):
536 """Obtain the repo used for processing wire protocol commands.
489 """Obtain the repo used for processing wire protocol commands.
537
490
538 The intent of this function is to serve as a monkeypatch point for
491 The intent of this function is to serve as a monkeypatch point for
539 extensions that need commands to operate on different repo views under
492 extensions that need commands to operate on different repo views under
540 specialized circumstances.
493 specialized circumstances.
541 """
494 """
542 return repo.filtered('served')
495 return repo.filtered('served')
543
496
544 def dispatch(repo, proto, command):
497 def dispatch(repo, proto, command):
545 repo = getdispatchrepo(repo, proto, command)
498 repo = getdispatchrepo(repo, proto, command)
546 func, spec = commands[command]
499 func, spec = commands[command]
547 args = proto.getargs(spec)
500 args = proto.getargs(spec)
548 return func(repo, proto, *args)
501 return func(repo, proto, *args)
549
502
550 def options(cmd, keys, others):
503 def options(cmd, keys, others):
551 opts = {}
504 opts = {}
552 for k in keys:
505 for k in keys:
553 if k in others:
506 if k in others:
554 opts[k] = others[k]
507 opts[k] = others[k]
555 del others[k]
508 del others[k]
556 if others:
509 if others:
557 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
510 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
558 % (cmd, ",".join(others)))
511 % (cmd, ",".join(others)))
559 return opts
512 return opts
560
513
561 def bundle1allowed(repo, action):
514 def bundle1allowed(repo, action):
562 """Whether a bundle1 operation is allowed from the server.
515 """Whether a bundle1 operation is allowed from the server.
563
516
564 Priority is:
517 Priority is:
565
518
566 1. server.bundle1gd.<action> (if generaldelta active)
519 1. server.bundle1gd.<action> (if generaldelta active)
567 2. server.bundle1.<action>
520 2. server.bundle1.<action>
568 3. server.bundle1gd (if generaldelta active)
521 3. server.bundle1gd (if generaldelta active)
569 4. server.bundle1
522 4. server.bundle1
570 """
523 """
571 ui = repo.ui
524 ui = repo.ui
572 gd = 'generaldelta' in repo.requirements
525 gd = 'generaldelta' in repo.requirements
573
526
574 if gd:
527 if gd:
575 v = ui.configbool('server', 'bundle1gd.%s' % action)
528 v = ui.configbool('server', 'bundle1gd.%s' % action)
576 if v is not None:
529 if v is not None:
577 return v
530 return v
578
531
579 v = ui.configbool('server', 'bundle1.%s' % action)
532 v = ui.configbool('server', 'bundle1.%s' % action)
580 if v is not None:
533 if v is not None:
581 return v
534 return v
582
535
583 if gd:
536 if gd:
584 v = ui.configbool('server', 'bundle1gd')
537 v = ui.configbool('server', 'bundle1gd')
585 if v is not None:
538 if v is not None:
586 return v
539 return v
587
540
588 return ui.configbool('server', 'bundle1')
541 return ui.configbool('server', 'bundle1')
589
542
590 def supportedcompengines(ui, role):
543 def supportedcompengines(ui, role):
591 """Obtain the list of supported compression engines for a request."""
544 """Obtain the list of supported compression engines for a request."""
592 assert role in (util.CLIENTROLE, util.SERVERROLE)
545 assert role in (util.CLIENTROLE, util.SERVERROLE)
593
546
594 compengines = util.compengines.supportedwireengines(role)
547 compengines = util.compengines.supportedwireengines(role)
595
548
596 # Allow config to override default list and ordering.
549 # Allow config to override default list and ordering.
597 if role == util.SERVERROLE:
550 if role == util.SERVERROLE:
598 configengines = ui.configlist('server', 'compressionengines')
551 configengines = ui.configlist('server', 'compressionengines')
599 config = 'server.compressionengines'
552 config = 'server.compressionengines'
600 else:
553 else:
601 # This is currently implemented mainly to facilitate testing. In most
554 # This is currently implemented mainly to facilitate testing. In most
602 # cases, the server should be in charge of choosing a compression engine
555 # cases, the server should be in charge of choosing a compression engine
603 # because a server has the most to lose from a sub-optimal choice. (e.g.
556 # because a server has the most to lose from a sub-optimal choice. (e.g.
604 # CPU DoS due to an expensive engine or a network DoS due to poor
557 # CPU DoS due to an expensive engine or a network DoS due to poor
605 # compression ratio).
558 # compression ratio).
606 configengines = ui.configlist('experimental',
559 configengines = ui.configlist('experimental',
607 'clientcompressionengines')
560 'clientcompressionengines')
608 config = 'experimental.clientcompressionengines'
561 config = 'experimental.clientcompressionengines'
609
562
610 # No explicit config. Filter out the ones that aren't supposed to be
563 # No explicit config. Filter out the ones that aren't supposed to be
611 # advertised and return default ordering.
564 # advertised and return default ordering.
612 if not configengines:
565 if not configengines:
613 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
566 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
614 return [e for e in compengines
567 return [e for e in compengines
615 if getattr(e.wireprotosupport(), attr) > 0]
568 if getattr(e.wireprotosupport(), attr) > 0]
616
569
617 # If compression engines are listed in the config, assume there is a good
570 # If compression engines are listed in the config, assume there is a good
618 # reason for it (like server operators wanting to achieve specific
571 # reason for it (like server operators wanting to achieve specific
619 # performance characteristics). So fail fast if the config references
572 # performance characteristics). So fail fast if the config references
620 # unusable compression engines.
573 # unusable compression engines.
621 validnames = set(e.name() for e in compengines)
574 validnames = set(e.name() for e in compengines)
622 invalidnames = set(e for e in configengines if e not in validnames)
575 invalidnames = set(e for e in configengines if e not in validnames)
623 if invalidnames:
576 if invalidnames:
624 raise error.Abort(_('invalid compression engine defined in %s: %s') %
577 raise error.Abort(_('invalid compression engine defined in %s: %s') %
625 (config, ', '.join(sorted(invalidnames))))
578 (config, ', '.join(sorted(invalidnames))))
626
579
627 compengines = [e for e in compengines if e.name() in configengines]
580 compengines = [e for e in compengines if e.name() in configengines]
628 compengines = sorted(compengines,
581 compengines = sorted(compengines,
629 key=lambda e: configengines.index(e.name()))
582 key=lambda e: configengines.index(e.name()))
630
583
631 if not compengines:
584 if not compengines:
632 raise error.Abort(_('%s config option does not specify any known '
585 raise error.Abort(_('%s config option does not specify any known '
633 'compression engines') % config,
586 'compression engines') % config,
634 hint=_('usable compression engines: %s') %
587 hint=_('usable compression engines: %s') %
635 ', '.sorted(validnames))
588 ', '.sorted(validnames))
636
589
637 return compengines
590 return compengines
638
591
639 class commandentry(object):
592 class commandentry(object):
640 """Represents a declared wire protocol command."""
593 """Represents a declared wire protocol command."""
641 def __init__(self, func, args=''):
594 def __init__(self, func, args=''):
642 self.func = func
595 self.func = func
643 self.args = args
596 self.args = args
644
597
645 def _merge(self, func, args):
598 def _merge(self, func, args):
646 """Merge this instance with an incoming 2-tuple.
599 """Merge this instance with an incoming 2-tuple.
647
600
648 This is called when a caller using the old 2-tuple API attempts
601 This is called when a caller using the old 2-tuple API attempts
649 to replace an instance. The incoming values are merged with
602 to replace an instance. The incoming values are merged with
650 data not captured by the 2-tuple and a new instance containing
603 data not captured by the 2-tuple and a new instance containing
651 the union of the two objects is returned.
604 the union of the two objects is returned.
652 """
605 """
653 return commandentry(func, args)
606 return commandentry(func, args)
654
607
655 # Old code treats instances as 2-tuples. So expose that interface.
608 # Old code treats instances as 2-tuples. So expose that interface.
656 def __iter__(self):
609 def __iter__(self):
657 yield self.func
610 yield self.func
658 yield self.args
611 yield self.args
659
612
660 def __getitem__(self, i):
613 def __getitem__(self, i):
661 if i == 0:
614 if i == 0:
662 return self.func
615 return self.func
663 elif i == 1:
616 elif i == 1:
664 return self.args
617 return self.args
665 else:
618 else:
666 raise IndexError('can only access elements 0 and 1')
619 raise IndexError('can only access elements 0 and 1')
667
620
668 class commanddict(dict):
621 class commanddict(dict):
669 """Container for registered wire protocol commands.
622 """Container for registered wire protocol commands.
670
623
671 It behaves like a dict. But __setitem__ is overwritten to allow silent
624 It behaves like a dict. But __setitem__ is overwritten to allow silent
672 coercion of values from 2-tuples for API compatibility.
625 coercion of values from 2-tuples for API compatibility.
673 """
626 """
674 def __setitem__(self, k, v):
627 def __setitem__(self, k, v):
675 if isinstance(v, commandentry):
628 if isinstance(v, commandentry):
676 pass
629 pass
677 # Cast 2-tuples to commandentry instances.
630 # Cast 2-tuples to commandentry instances.
678 elif isinstance(v, tuple):
631 elif isinstance(v, tuple):
679 if len(v) != 2:
632 if len(v) != 2:
680 raise ValueError('command tuples must have exactly 2 elements')
633 raise ValueError('command tuples must have exactly 2 elements')
681
634
682 # It is common for extensions to wrap wire protocol commands via
635 # It is common for extensions to wrap wire protocol commands via
683 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
636 # e.g. ``wireproto.commands[x] = (newfn, args)``. Because callers
684 # doing this aren't aware of the new API that uses objects to store
637 # doing this aren't aware of the new API that uses objects to store
685 # command entries, we automatically merge old state with new.
638 # command entries, we automatically merge old state with new.
686 if k in self:
639 if k in self:
687 v = self[k]._merge(v[0], v[1])
640 v = self[k]._merge(v[0], v[1])
688 else:
641 else:
689 v = commandentry(v[0], v[1])
642 v = commandentry(v[0], v[1])
690 else:
643 else:
691 raise ValueError('command entries must be commandentry instances '
644 raise ValueError('command entries must be commandentry instances '
692 'or 2-tuples')
645 'or 2-tuples')
693
646
694 return super(commanddict, self).__setitem__(k, v)
647 return super(commanddict, self).__setitem__(k, v)
695
648
696 def commandavailable(self, command, proto):
649 def commandavailable(self, command, proto):
697 """Determine if a command is available for the requested protocol."""
650 """Determine if a command is available for the requested protocol."""
698 # For now, commands are available for all protocols. So do a simple
651 # For now, commands are available for all protocols. So do a simple
699 # membership test.
652 # membership test.
700 return command in self
653 return command in self
701
654
702 commands = commanddict()
655 commands = commanddict()
703
656
704 def wireprotocommand(name, args=''):
657 def wireprotocommand(name, args=''):
705 """Decorator to declare a wire protocol command.
658 """Decorator to declare a wire protocol command.
706
659
707 ``name`` is the name of the wire protocol command being provided.
660 ``name`` is the name of the wire protocol command being provided.
708
661
709 ``args`` is a space-delimited list of named arguments that the command
662 ``args`` is a space-delimited list of named arguments that the command
710 accepts. ``*`` is a special value that says to accept all arguments.
663 accepts. ``*`` is a special value that says to accept all arguments.
711 """
664 """
712 def register(func):
665 def register(func):
713 commands[name] = commandentry(func, args)
666 commands[name] = commandentry(func, args)
714 return func
667 return func
715 return register
668 return register
716
669
717 @wireprotocommand('batch', 'cmds *')
670 @wireprotocommand('batch', 'cmds *')
718 def batch(repo, proto, cmds, others):
671 def batch(repo, proto, cmds, others):
719 repo = repo.filtered("served")
672 repo = repo.filtered("served")
720 res = []
673 res = []
721 for pair in cmds.split(';'):
674 for pair in cmds.split(';'):
722 op, args = pair.split(' ', 1)
675 op, args = pair.split(' ', 1)
723 vals = {}
676 vals = {}
724 for a in args.split(','):
677 for a in args.split(','):
725 if a:
678 if a:
726 n, v = a.split('=')
679 n, v = a.split('=')
727 vals[unescapearg(n)] = unescapearg(v)
680 vals[unescapearg(n)] = unescapearg(v)
728 func, spec = commands[op]
681 func, spec = commands[op]
729 if spec:
682 if spec:
730 keys = spec.split()
683 keys = spec.split()
731 data = {}
684 data = {}
732 for k in keys:
685 for k in keys:
733 if k == '*':
686 if k == '*':
734 star = {}
687 star = {}
735 for key in vals.keys():
688 for key in vals.keys():
736 if key not in keys:
689 if key not in keys:
737 star[key] = vals[key]
690 star[key] = vals[key]
738 data['*'] = star
691 data['*'] = star
739 else:
692 else:
740 data[k] = vals[k]
693 data[k] = vals[k]
741 result = func(repo, proto, *[data[k] for k in keys])
694 result = func(repo, proto, *[data[k] for k in keys])
742 else:
695 else:
743 result = func(repo, proto)
696 result = func(repo, proto)
744 if isinstance(result, ooberror):
697 if isinstance(result, ooberror):
745 return result
698 return result
746 res.append(escapearg(result))
699 res.append(escapearg(result))
747 return ';'.join(res)
700 return ';'.join(res)
748
701
749 @wireprotocommand('between', 'pairs')
702 @wireprotocommand('between', 'pairs')
750 def between(repo, proto, pairs):
703 def between(repo, proto, pairs):
751 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
704 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
752 r = []
705 r = []
753 for b in repo.between(pairs):
706 for b in repo.between(pairs):
754 r.append(encodelist(b) + "\n")
707 r.append(encodelist(b) + "\n")
755 return "".join(r)
708 return "".join(r)
756
709
757 @wireprotocommand('branchmap')
710 @wireprotocommand('branchmap')
758 def branchmap(repo, proto):
711 def branchmap(repo, proto):
759 branchmap = repo.branchmap()
712 branchmap = repo.branchmap()
760 heads = []
713 heads = []
761 for branch, nodes in branchmap.iteritems():
714 for branch, nodes in branchmap.iteritems():
762 branchname = urlreq.quote(encoding.fromlocal(branch))
715 branchname = urlreq.quote(encoding.fromlocal(branch))
763 branchnodes = encodelist(nodes)
716 branchnodes = encodelist(nodes)
764 heads.append('%s %s' % (branchname, branchnodes))
717 heads.append('%s %s' % (branchname, branchnodes))
765 return '\n'.join(heads)
718 return '\n'.join(heads)
766
719
767 @wireprotocommand('branches', 'nodes')
720 @wireprotocommand('branches', 'nodes')
768 def branches(repo, proto, nodes):
721 def branches(repo, proto, nodes):
769 nodes = decodelist(nodes)
722 nodes = decodelist(nodes)
770 r = []
723 r = []
771 for b in repo.branches(nodes):
724 for b in repo.branches(nodes):
772 r.append(encodelist(b) + "\n")
725 r.append(encodelist(b) + "\n")
773 return "".join(r)
726 return "".join(r)
774
727
775 @wireprotocommand('clonebundles', '')
728 @wireprotocommand('clonebundles', '')
776 def clonebundles(repo, proto):
729 def clonebundles(repo, proto):
777 """Server command for returning info for available bundles to seed clones.
730 """Server command for returning info for available bundles to seed clones.
778
731
779 Clients will parse this response and determine what bundle to fetch.
732 Clients will parse this response and determine what bundle to fetch.
780
733
781 Extensions may wrap this command to filter or dynamically emit data
734 Extensions may wrap this command to filter or dynamically emit data
782 depending on the request. e.g. you could advertise URLs for the closest
735 depending on the request. e.g. you could advertise URLs for the closest
783 data center given the client's IP address.
736 data center given the client's IP address.
784 """
737 """
785 return repo.vfs.tryread('clonebundles.manifest')
738 return repo.vfs.tryread('clonebundles.manifest')
786
739
787 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
740 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
788 'known', 'getbundle', 'unbundlehash', 'batch']
741 'known', 'getbundle', 'unbundlehash', 'batch']
789
742
790 def _capabilities(repo, proto):
743 def _capabilities(repo, proto):
791 """return a list of capabilities for a repo
744 """return a list of capabilities for a repo
792
745
793 This function exists to allow extensions to easily wrap capabilities
746 This function exists to allow extensions to easily wrap capabilities
794 computation
747 computation
795
748
796 - returns a lists: easy to alter
749 - returns a lists: easy to alter
797 - change done here will be propagated to both `capabilities` and `hello`
750 - change done here will be propagated to both `capabilities` and `hello`
798 command without any other action needed.
751 command without any other action needed.
799 """
752 """
800 # copy to prevent modification of the global list
753 # copy to prevent modification of the global list
801 caps = list(wireprotocaps)
754 caps = list(wireprotocaps)
802 if streamclone.allowservergeneration(repo):
755 if streamclone.allowservergeneration(repo):
803 if repo.ui.configbool('server', 'preferuncompressed'):
756 if repo.ui.configbool('server', 'preferuncompressed'):
804 caps.append('stream-preferred')
757 caps.append('stream-preferred')
805 requiredformats = repo.requirements & repo.supportedformats
758 requiredformats = repo.requirements & repo.supportedformats
806 # if our local revlogs are just revlogv1, add 'stream' cap
759 # if our local revlogs are just revlogv1, add 'stream' cap
807 if not requiredformats - {'revlogv1'}:
760 if not requiredformats - {'revlogv1'}:
808 caps.append('stream')
761 caps.append('stream')
809 # otherwise, add 'streamreqs' detailing our local revlog format
762 # otherwise, add 'streamreqs' detailing our local revlog format
810 else:
763 else:
811 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
764 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
812 if repo.ui.configbool('experimental', 'bundle2-advertise'):
765 if repo.ui.configbool('experimental', 'bundle2-advertise'):
813 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
766 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
814 caps.append('bundle2=' + urlreq.quote(capsblob))
767 caps.append('bundle2=' + urlreq.quote(capsblob))
815 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
768 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
816
769
817 if proto.name == 'http':
770 if proto.name == 'http':
818 caps.append('httpheader=%d' %
771 caps.append('httpheader=%d' %
819 repo.ui.configint('server', 'maxhttpheaderlen'))
772 repo.ui.configint('server', 'maxhttpheaderlen'))
820 if repo.ui.configbool('experimental', 'httppostargs'):
773 if repo.ui.configbool('experimental', 'httppostargs'):
821 caps.append('httppostargs')
774 caps.append('httppostargs')
822
775
823 # FUTURE advertise 0.2rx once support is implemented
776 # FUTURE advertise 0.2rx once support is implemented
824 # FUTURE advertise minrx and mintx after consulting config option
777 # FUTURE advertise minrx and mintx after consulting config option
825 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
778 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
826
779
827 compengines = supportedcompengines(repo.ui, util.SERVERROLE)
780 compengines = supportedcompengines(repo.ui, util.SERVERROLE)
828 if compengines:
781 if compengines:
829 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
782 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
830 for e in compengines)
783 for e in compengines)
831 caps.append('compression=%s' % comptypes)
784 caps.append('compression=%s' % comptypes)
832
785
833 return caps
786 return caps
834
787
835 # If you are writing an extension and consider wrapping this function. Wrap
788 # If you are writing an extension and consider wrapping this function. Wrap
836 # `_capabilities` instead.
789 # `_capabilities` instead.
837 @wireprotocommand('capabilities')
790 @wireprotocommand('capabilities')
838 def capabilities(repo, proto):
791 def capabilities(repo, proto):
839 return ' '.join(_capabilities(repo, proto))
792 return ' '.join(_capabilities(repo, proto))
840
793
841 @wireprotocommand('changegroup', 'roots')
794 @wireprotocommand('changegroup', 'roots')
842 def changegroup(repo, proto, roots):
795 def changegroup(repo, proto, roots):
843 nodes = decodelist(roots)
796 nodes = decodelist(roots)
844 outgoing = discovery.outgoing(repo, missingroots=nodes,
797 outgoing = discovery.outgoing(repo, missingroots=nodes,
845 missingheads=repo.heads())
798 missingheads=repo.heads())
846 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
799 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
847 gen = iter(lambda: cg.read(32768), '')
800 gen = iter(lambda: cg.read(32768), '')
848 return streamres(gen=gen)
801 return streamres(gen=gen)
849
802
850 @wireprotocommand('changegroupsubset', 'bases heads')
803 @wireprotocommand('changegroupsubset', 'bases heads')
851 def changegroupsubset(repo, proto, bases, heads):
804 def changegroupsubset(repo, proto, bases, heads):
852 bases = decodelist(bases)
805 bases = decodelist(bases)
853 heads = decodelist(heads)
806 heads = decodelist(heads)
854 outgoing = discovery.outgoing(repo, missingroots=bases,
807 outgoing = discovery.outgoing(repo, missingroots=bases,
855 missingheads=heads)
808 missingheads=heads)
856 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
809 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
857 gen = iter(lambda: cg.read(32768), '')
810 gen = iter(lambda: cg.read(32768), '')
858 return streamres(gen=gen)
811 return streamres(gen=gen)
859
812
860 @wireprotocommand('debugwireargs', 'one two *')
813 @wireprotocommand('debugwireargs', 'one two *')
861 def debugwireargs(repo, proto, one, two, others):
814 def debugwireargs(repo, proto, one, two, others):
862 # only accept optional args from the known set
815 # only accept optional args from the known set
863 opts = options('debugwireargs', ['three', 'four'], others)
816 opts = options('debugwireargs', ['three', 'four'], others)
864 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
817 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
865
818
866 @wireprotocommand('getbundle', '*')
819 @wireprotocommand('getbundle', '*')
867 def getbundle(repo, proto, others):
820 def getbundle(repo, proto, others):
868 opts = options('getbundle', gboptsmap.keys(), others)
821 opts = options('getbundle', gboptsmap.keys(), others)
869 for k, v in opts.iteritems():
822 for k, v in opts.iteritems():
870 keytype = gboptsmap[k]
823 keytype = gboptsmap[k]
871 if keytype == 'nodes':
824 if keytype == 'nodes':
872 opts[k] = decodelist(v)
825 opts[k] = decodelist(v)
873 elif keytype == 'csv':
826 elif keytype == 'csv':
874 opts[k] = list(v.split(','))
827 opts[k] = list(v.split(','))
875 elif keytype == 'scsv':
828 elif keytype == 'scsv':
876 opts[k] = set(v.split(','))
829 opts[k] = set(v.split(','))
877 elif keytype == 'boolean':
830 elif keytype == 'boolean':
878 # Client should serialize False as '0', which is a non-empty string
831 # Client should serialize False as '0', which is a non-empty string
879 # so it evaluates as a True bool.
832 # so it evaluates as a True bool.
880 if v == '0':
833 if v == '0':
881 opts[k] = False
834 opts[k] = False
882 else:
835 else:
883 opts[k] = bool(v)
836 opts[k] = bool(v)
884 elif keytype != 'plain':
837 elif keytype != 'plain':
885 raise KeyError('unknown getbundle option type %s'
838 raise KeyError('unknown getbundle option type %s'
886 % keytype)
839 % keytype)
887
840
888 if not bundle1allowed(repo, 'pull'):
841 if not bundle1allowed(repo, 'pull'):
889 if not exchange.bundle2requested(opts.get('bundlecaps')):
842 if not exchange.bundle2requested(opts.get('bundlecaps')):
890 if proto.name == 'http':
843 if proto.name == 'http':
891 return ooberror(bundle2required)
844 return ooberror(bundle2required)
892 raise error.Abort(bundle2requiredmain,
845 raise error.Abort(bundle2requiredmain,
893 hint=bundle2requiredhint)
846 hint=bundle2requiredhint)
894
847
895 prefercompressed = True
848 prefercompressed = True
896
849
897 try:
850 try:
898 if repo.ui.configbool('server', 'disablefullbundle'):
851 if repo.ui.configbool('server', 'disablefullbundle'):
899 # Check to see if this is a full clone.
852 # Check to see if this is a full clone.
900 clheads = set(repo.changelog.heads())
853 clheads = set(repo.changelog.heads())
901 changegroup = opts.get('cg', True)
854 changegroup = opts.get('cg', True)
902 heads = set(opts.get('heads', set()))
855 heads = set(opts.get('heads', set()))
903 common = set(opts.get('common', set()))
856 common = set(opts.get('common', set()))
904 common.discard(nullid)
857 common.discard(nullid)
905 if changegroup and not common and clheads == heads:
858 if changegroup and not common and clheads == heads:
906 raise error.Abort(
859 raise error.Abort(
907 _('server has pull-based clones disabled'),
860 _('server has pull-based clones disabled'),
908 hint=_('remove --pull if specified or upgrade Mercurial'))
861 hint=_('remove --pull if specified or upgrade Mercurial'))
909
862
910 info, chunks = exchange.getbundlechunks(repo, 'serve',
863 info, chunks = exchange.getbundlechunks(repo, 'serve',
911 **pycompat.strkwargs(opts))
864 **pycompat.strkwargs(opts))
912 prefercompressed = info.get('prefercompressed', True)
865 prefercompressed = info.get('prefercompressed', True)
913 except error.Abort as exc:
866 except error.Abort as exc:
914 # cleanly forward Abort error to the client
867 # cleanly forward Abort error to the client
915 if not exchange.bundle2requested(opts.get('bundlecaps')):
868 if not exchange.bundle2requested(opts.get('bundlecaps')):
916 if proto.name == 'http':
869 if proto.name == 'http':
917 return ooberror(str(exc) + '\n')
870 return ooberror(str(exc) + '\n')
918 raise # cannot do better for bundle1 + ssh
871 raise # cannot do better for bundle1 + ssh
919 # bundle2 request expect a bundle2 reply
872 # bundle2 request expect a bundle2 reply
920 bundler = bundle2.bundle20(repo.ui)
873 bundler = bundle2.bundle20(repo.ui)
921 manargs = [('message', str(exc))]
874 manargs = [('message', str(exc))]
922 advargs = []
875 advargs = []
923 if exc.hint is not None:
876 if exc.hint is not None:
924 advargs.append(('hint', exc.hint))
877 advargs.append(('hint', exc.hint))
925 bundler.addpart(bundle2.bundlepart('error:abort',
878 bundler.addpart(bundle2.bundlepart('error:abort',
926 manargs, advargs))
879 manargs, advargs))
927 chunks = bundler.getchunks()
880 chunks = bundler.getchunks()
928 prefercompressed = False
881 prefercompressed = False
929
882
930 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
883 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
931
884
932 @wireprotocommand('heads')
885 @wireprotocommand('heads')
933 def heads(repo, proto):
886 def heads(repo, proto):
934 h = repo.heads()
887 h = repo.heads()
935 return encodelist(h) + "\n"
888 return encodelist(h) + "\n"
936
889
937 @wireprotocommand('hello')
890 @wireprotocommand('hello')
938 def hello(repo, proto):
891 def hello(repo, proto):
939 '''the hello command returns a set of lines describing various
892 '''the hello command returns a set of lines describing various
940 interesting things about the server, in an RFC822-like format.
893 interesting things about the server, in an RFC822-like format.
941 Currently the only one defined is "capabilities", which
894 Currently the only one defined is "capabilities", which
942 consists of a line in the form:
895 consists of a line in the form:
943
896
944 capabilities: space separated list of tokens
897 capabilities: space separated list of tokens
945 '''
898 '''
946 return "capabilities: %s\n" % (capabilities(repo, proto))
899 return "capabilities: %s\n" % (capabilities(repo, proto))
947
900
948 @wireprotocommand('listkeys', 'namespace')
901 @wireprotocommand('listkeys', 'namespace')
949 def listkeys(repo, proto, namespace):
902 def listkeys(repo, proto, namespace):
950 d = repo.listkeys(encoding.tolocal(namespace)).items()
903 d = repo.listkeys(encoding.tolocal(namespace)).items()
951 return pushkeymod.encodekeys(d)
904 return pushkeymod.encodekeys(d)
952
905
953 @wireprotocommand('lookup', 'key')
906 @wireprotocommand('lookup', 'key')
954 def lookup(repo, proto, key):
907 def lookup(repo, proto, key):
955 try:
908 try:
956 k = encoding.tolocal(key)
909 k = encoding.tolocal(key)
957 c = repo[k]
910 c = repo[k]
958 r = c.hex()
911 r = c.hex()
959 success = 1
912 success = 1
960 except Exception as inst:
913 except Exception as inst:
961 r = str(inst)
914 r = str(inst)
962 success = 0
915 success = 0
963 return "%d %s\n" % (success, r)
916 return "%d %s\n" % (success, r)
964
917
965 @wireprotocommand('known', 'nodes *')
918 @wireprotocommand('known', 'nodes *')
966 def known(repo, proto, nodes, others):
919 def known(repo, proto, nodes, others):
967 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
920 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
968
921
969 @wireprotocommand('pushkey', 'namespace key old new')
922 @wireprotocommand('pushkey', 'namespace key old new')
970 def pushkey(repo, proto, namespace, key, old, new):
923 def pushkey(repo, proto, namespace, key, old, new):
971 # compatibility with pre-1.8 clients which were accidentally
924 # compatibility with pre-1.8 clients which were accidentally
972 # sending raw binary nodes rather than utf-8-encoded hex
925 # sending raw binary nodes rather than utf-8-encoded hex
973 if len(new) == 20 and util.escapestr(new) != new:
926 if len(new) == 20 and util.escapestr(new) != new:
974 # looks like it could be a binary node
927 # looks like it could be a binary node
975 try:
928 try:
976 new.decode('utf-8')
929 new.decode('utf-8')
977 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
930 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
978 except UnicodeDecodeError:
931 except UnicodeDecodeError:
979 pass # binary, leave unmodified
932 pass # binary, leave unmodified
980 else:
933 else:
981 new = encoding.tolocal(new) # normal path
934 new = encoding.tolocal(new) # normal path
982
935
983 with proto.mayberedirectstdio() as output:
936 with proto.mayberedirectstdio() as output:
984 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
937 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
985 encoding.tolocal(old), new) or False
938 encoding.tolocal(old), new) or False
986
939
987 output = output.getvalue() if output else ''
940 output = output.getvalue() if output else ''
988 return '%s\n%s' % (int(r), output)
941 return '%s\n%s' % (int(r), output)
989
942
990 @wireprotocommand('stream_out')
943 @wireprotocommand('stream_out')
991 def stream(repo, proto):
944 def stream(repo, proto):
992 '''If the server supports streaming clone, it advertises the "stream"
945 '''If the server supports streaming clone, it advertises the "stream"
993 capability with a value representing the version and flags of the repo
946 capability with a value representing the version and flags of the repo
994 it is serving. Client checks to see if it understands the format.
947 it is serving. Client checks to see if it understands the format.
995 '''
948 '''
996 return streamres_legacy(streamclone.generatev1wireproto(repo))
949 return streamres_legacy(streamclone.generatev1wireproto(repo))
997
950
998 @wireprotocommand('unbundle', 'heads')
951 @wireprotocommand('unbundle', 'heads')
999 def unbundle(repo, proto, heads):
952 def unbundle(repo, proto, heads):
1000 their_heads = decodelist(heads)
953 their_heads = decodelist(heads)
1001
954
1002 with proto.mayberedirectstdio() as output:
955 with proto.mayberedirectstdio() as output:
1003 try:
956 try:
1004 exchange.check_heads(repo, their_heads, 'preparing changes')
957 exchange.check_heads(repo, their_heads, 'preparing changes')
1005
958
1006 # write bundle data to temporary file because it can be big
959 # write bundle data to temporary file because it can be big
1007 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
960 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
1008 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
961 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
1009 r = 0
962 r = 0
1010 try:
963 try:
1011 proto.forwardpayload(fp)
964 proto.forwardpayload(fp)
1012 fp.seek(0)
965 fp.seek(0)
1013 gen = exchange.readbundle(repo.ui, fp, None)
966 gen = exchange.readbundle(repo.ui, fp, None)
1014 if (isinstance(gen, changegroupmod.cg1unpacker)
967 if (isinstance(gen, changegroupmod.cg1unpacker)
1015 and not bundle1allowed(repo, 'push')):
968 and not bundle1allowed(repo, 'push')):
1016 if proto.name == 'http':
969 if proto.name == 'http':
1017 # need to special case http because stderr do not get to
970 # need to special case http because stderr do not get to
1018 # the http client on failed push so we need to abuse
971 # the http client on failed push so we need to abuse
1019 # some other error type to make sure the message get to
972 # some other error type to make sure the message get to
1020 # the user.
973 # the user.
1021 return ooberror(bundle2required)
974 return ooberror(bundle2required)
1022 raise error.Abort(bundle2requiredmain,
975 raise error.Abort(bundle2requiredmain,
1023 hint=bundle2requiredhint)
976 hint=bundle2requiredhint)
1024
977
1025 r = exchange.unbundle(repo, gen, their_heads, 'serve',
978 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1026 proto.client())
979 proto.client())
1027 if util.safehasattr(r, 'addpart'):
980 if util.safehasattr(r, 'addpart'):
1028 # The return looks streamable, we are in the bundle2 case
981 # The return looks streamable, we are in the bundle2 case
1029 # and should return a stream.
982 # and should return a stream.
1030 return streamres_legacy(gen=r.getchunks())
983 return streamres_legacy(gen=r.getchunks())
1031 return pushres(r, output.getvalue() if output else '')
984 return pushres(r, output.getvalue() if output else '')
1032
985
1033 finally:
986 finally:
1034 fp.close()
987 fp.close()
1035 os.unlink(tempname)
988 os.unlink(tempname)
1036
989
1037 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
990 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1038 # handle non-bundle2 case first
991 # handle non-bundle2 case first
1039 if not getattr(exc, 'duringunbundle2', False):
992 if not getattr(exc, 'duringunbundle2', False):
1040 try:
993 try:
1041 raise
994 raise
1042 except error.Abort:
995 except error.Abort:
1043 # The old code we moved used util.stderr directly.
996 # The old code we moved used util.stderr directly.
1044 # We did not change it to minimise code change.
997 # We did not change it to minimise code change.
1045 # This need to be moved to something proper.
998 # This need to be moved to something proper.
1046 # Feel free to do it.
999 # Feel free to do it.
1047 util.stderr.write("abort: %s\n" % exc)
1000 util.stderr.write("abort: %s\n" % exc)
1048 if exc.hint is not None:
1001 if exc.hint is not None:
1049 util.stderr.write("(%s)\n" % exc.hint)
1002 util.stderr.write("(%s)\n" % exc.hint)
1050 return pushres(0, output.getvalue() if output else '')
1003 return pushres(0, output.getvalue() if output else '')
1051 except error.PushRaced:
1004 except error.PushRaced:
1052 return pusherr(str(exc),
1005 return pusherr(str(exc),
1053 output.getvalue() if output else '')
1006 output.getvalue() if output else '')
1054
1007
1055 bundler = bundle2.bundle20(repo.ui)
1008 bundler = bundle2.bundle20(repo.ui)
1056 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1009 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1057 bundler.addpart(out)
1010 bundler.addpart(out)
1058 try:
1011 try:
1059 try:
1012 try:
1060 raise
1013 raise
1061 except error.PushkeyFailed as exc:
1014 except error.PushkeyFailed as exc:
1062 # check client caps
1015 # check client caps
1063 remotecaps = getattr(exc, '_replycaps', None)
1016 remotecaps = getattr(exc, '_replycaps', None)
1064 if (remotecaps is not None
1017 if (remotecaps is not None
1065 and 'pushkey' not in remotecaps.get('error', ())):
1018 and 'pushkey' not in remotecaps.get('error', ())):
1066 # no support remote side, fallback to Abort handler.
1019 # no support remote side, fallback to Abort handler.
1067 raise
1020 raise
1068 part = bundler.newpart('error:pushkey')
1021 part = bundler.newpart('error:pushkey')
1069 part.addparam('in-reply-to', exc.partid)
1022 part.addparam('in-reply-to', exc.partid)
1070 if exc.namespace is not None:
1023 if exc.namespace is not None:
1071 part.addparam('namespace', exc.namespace,
1024 part.addparam('namespace', exc.namespace,
1072 mandatory=False)
1025 mandatory=False)
1073 if exc.key is not None:
1026 if exc.key is not None:
1074 part.addparam('key', exc.key, mandatory=False)
1027 part.addparam('key', exc.key, mandatory=False)
1075 if exc.new is not None:
1028 if exc.new is not None:
1076 part.addparam('new', exc.new, mandatory=False)
1029 part.addparam('new', exc.new, mandatory=False)
1077 if exc.old is not None:
1030 if exc.old is not None:
1078 part.addparam('old', exc.old, mandatory=False)
1031 part.addparam('old', exc.old, mandatory=False)
1079 if exc.ret is not None:
1032 if exc.ret is not None:
1080 part.addparam('ret', exc.ret, mandatory=False)
1033 part.addparam('ret', exc.ret, mandatory=False)
1081 except error.BundleValueError as exc:
1034 except error.BundleValueError as exc:
1082 errpart = bundler.newpart('error:unsupportedcontent')
1035 errpart = bundler.newpart('error:unsupportedcontent')
1083 if exc.parttype is not None:
1036 if exc.parttype is not None:
1084 errpart.addparam('parttype', exc.parttype)
1037 errpart.addparam('parttype', exc.parttype)
1085 if exc.params:
1038 if exc.params:
1086 errpart.addparam('params', '\0'.join(exc.params))
1039 errpart.addparam('params', '\0'.join(exc.params))
1087 except error.Abort as exc:
1040 except error.Abort as exc:
1088 manargs = [('message', str(exc))]
1041 manargs = [('message', str(exc))]
1089 advargs = []
1042 advargs = []
1090 if exc.hint is not None:
1043 if exc.hint is not None:
1091 advargs.append(('hint', exc.hint))
1044 advargs.append(('hint', exc.hint))
1092 bundler.addpart(bundle2.bundlepart('error:abort',
1045 bundler.addpart(bundle2.bundlepart('error:abort',
1093 manargs, advargs))
1046 manargs, advargs))
1094 except error.PushRaced as exc:
1047 except error.PushRaced as exc:
1095 bundler.newpart('error:pushraced', [('message', str(exc))])
1048 bundler.newpart('error:pushraced', [('message', str(exc))])
1096 return streamres_legacy(gen=bundler.getchunks())
1049 return streamres_legacy(gen=bundler.getchunks())
@@ -1,453 +1,454 b''
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import abc
9 import abc
10 import cgi
10 import cgi
11 import contextlib
11 import contextlib
12 import struct
12 import struct
13 import sys
13 import sys
14
14
15 from .i18n import _
15 from .i18n import _
16 from . import (
16 from . import (
17 encoding,
17 encoding,
18 error,
18 error,
19 hook,
19 hook,
20 pycompat,
20 pycompat,
21 util,
21 util,
22 wireproto,
22 wireproto,
23 wireprototypes,
23 )
24 )
24
25
25 stringio = util.stringio
26 stringio = util.stringio
26
27
27 urlerr = util.urlerr
28 urlerr = util.urlerr
28 urlreq = util.urlreq
29 urlreq = util.urlreq
29
30
30 HTTP_OK = 200
31 HTTP_OK = 200
31
32
32 HGTYPE = 'application/mercurial-0.1'
33 HGTYPE = 'application/mercurial-0.1'
33 HGTYPE2 = 'application/mercurial-0.2'
34 HGTYPE2 = 'application/mercurial-0.2'
34 HGERRTYPE = 'application/hg-error'
35 HGERRTYPE = 'application/hg-error'
35
36
36 # Names of the SSH protocol implementations.
37 # Names of the SSH protocol implementations.
37 SSHV1 = 'ssh-v1'
38 SSHV1 = 'ssh-v1'
38 # This is advertised over the wire. Incremental the counter at the end
39 # This is advertised over the wire. Incremental the counter at the end
39 # to reflect BC breakages.
40 # to reflect BC breakages.
40 SSHV2 = 'exp-ssh-v2-0001'
41 SSHV2 = 'exp-ssh-v2-0001'
41
42
42 class baseprotocolhandler(object):
43 class baseprotocolhandler(object):
43 """Abstract base class for wire protocol handlers.
44 """Abstract base class for wire protocol handlers.
44
45
45 A wire protocol handler serves as an interface between protocol command
46 A wire protocol handler serves as an interface between protocol command
46 handlers and the wire protocol transport layer. Protocol handlers provide
47 handlers and the wire protocol transport layer. Protocol handlers provide
47 methods to read command arguments, redirect stdio for the duration of
48 methods to read command arguments, redirect stdio for the duration of
48 the request, handle response types, etc.
49 the request, handle response types, etc.
49 """
50 """
50
51
51 __metaclass__ = abc.ABCMeta
52 __metaclass__ = abc.ABCMeta
52
53
53 @abc.abstractproperty
54 @abc.abstractproperty
54 def name(self):
55 def name(self):
55 """The name of the protocol implementation.
56 """The name of the protocol implementation.
56
57
57 Used for uniquely identifying the transport type.
58 Used for uniquely identifying the transport type.
58 """
59 """
59
60
60 @abc.abstractmethod
61 @abc.abstractmethod
61 def getargs(self, args):
62 def getargs(self, args):
62 """return the value for arguments in <args>
63 """return the value for arguments in <args>
63
64
64 returns a list of values (same order as <args>)"""
65 returns a list of values (same order as <args>)"""
65
66
66 @abc.abstractmethod
67 @abc.abstractmethod
67 def forwardpayload(self, fp):
68 def forwardpayload(self, fp):
68 """Read the raw payload and forward to a file.
69 """Read the raw payload and forward to a file.
69
70
70 The payload is read in full before the function returns.
71 The payload is read in full before the function returns.
71 """
72 """
72
73
73 @abc.abstractmethod
74 @abc.abstractmethod
74 def mayberedirectstdio(self):
75 def mayberedirectstdio(self):
75 """Context manager to possibly redirect stdio.
76 """Context manager to possibly redirect stdio.
76
77
77 The context manager yields a file-object like object that receives
78 The context manager yields a file-object like object that receives
78 stdout and stderr output when the context manager is active. Or it
79 stdout and stderr output when the context manager is active. Or it
79 yields ``None`` if no I/O redirection occurs.
80 yields ``None`` if no I/O redirection occurs.
80
81
81 The intent of this context manager is to capture stdio output
82 The intent of this context manager is to capture stdio output
82 so it may be sent in the response. Some transports support streaming
83 so it may be sent in the response. Some transports support streaming
83 stdio to the client in real time. For these transports, stdio output
84 stdio to the client in real time. For these transports, stdio output
84 won't be captured.
85 won't be captured.
85 """
86 """
86
87
87 @abc.abstractmethod
88 @abc.abstractmethod
88 def client(self):
89 def client(self):
89 """Returns a string representation of this client (as bytes)."""
90 """Returns a string representation of this client (as bytes)."""
90
91
91 def decodevaluefromheaders(req, headerprefix):
92 def decodevaluefromheaders(req, headerprefix):
92 """Decode a long value from multiple HTTP request headers.
93 """Decode a long value from multiple HTTP request headers.
93
94
94 Returns the value as a bytes, not a str.
95 Returns the value as a bytes, not a str.
95 """
96 """
96 chunks = []
97 chunks = []
97 i = 1
98 i = 1
98 prefix = headerprefix.upper().replace(r'-', r'_')
99 prefix = headerprefix.upper().replace(r'-', r'_')
99 while True:
100 while True:
100 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
101 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
101 if v is None:
102 if v is None:
102 break
103 break
103 chunks.append(pycompat.bytesurl(v))
104 chunks.append(pycompat.bytesurl(v))
104 i += 1
105 i += 1
105
106
106 return ''.join(chunks)
107 return ''.join(chunks)
107
108
108 class webproto(baseprotocolhandler):
109 class webproto(baseprotocolhandler):
109 def __init__(self, req, ui):
110 def __init__(self, req, ui):
110 self._req = req
111 self._req = req
111 self._ui = ui
112 self._ui = ui
112
113
113 @property
114 @property
114 def name(self):
115 def name(self):
115 return 'http'
116 return 'http'
116
117
117 def getargs(self, args):
118 def getargs(self, args):
118 knownargs = self._args()
119 knownargs = self._args()
119 data = {}
120 data = {}
120 keys = args.split()
121 keys = args.split()
121 for k in keys:
122 for k in keys:
122 if k == '*':
123 if k == '*':
123 star = {}
124 star = {}
124 for key in knownargs.keys():
125 for key in knownargs.keys():
125 if key != 'cmd' and key not in keys:
126 if key != 'cmd' and key not in keys:
126 star[key] = knownargs[key][0]
127 star[key] = knownargs[key][0]
127 data['*'] = star
128 data['*'] = star
128 else:
129 else:
129 data[k] = knownargs[k][0]
130 data[k] = knownargs[k][0]
130 return [data[k] for k in keys]
131 return [data[k] for k in keys]
131
132
132 def _args(self):
133 def _args(self):
133 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
134 args = util.rapply(pycompat.bytesurl, self._req.form.copy())
134 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
135 postlen = int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
135 if postlen:
136 if postlen:
136 args.update(cgi.parse_qs(
137 args.update(cgi.parse_qs(
137 self._req.read(postlen), keep_blank_values=True))
138 self._req.read(postlen), keep_blank_values=True))
138 return args
139 return args
139
140
140 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
141 argvalue = decodevaluefromheaders(self._req, r'X-HgArg')
141 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
142 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
142 return args
143 return args
143
144
144 def forwardpayload(self, fp):
145 def forwardpayload(self, fp):
145 length = int(self._req.env[r'CONTENT_LENGTH'])
146 length = int(self._req.env[r'CONTENT_LENGTH'])
146 # If httppostargs is used, we need to read Content-Length
147 # If httppostargs is used, we need to read Content-Length
147 # minus the amount that was consumed by args.
148 # minus the amount that was consumed by args.
148 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
149 length -= int(self._req.env.get(r'HTTP_X_HGARGS_POST', 0))
149 for s in util.filechunkiter(self._req, limit=length):
150 for s in util.filechunkiter(self._req, limit=length):
150 fp.write(s)
151 fp.write(s)
151
152
152 @contextlib.contextmanager
153 @contextlib.contextmanager
153 def mayberedirectstdio(self):
154 def mayberedirectstdio(self):
154 oldout = self._ui.fout
155 oldout = self._ui.fout
155 olderr = self._ui.ferr
156 olderr = self._ui.ferr
156
157
157 out = util.stringio()
158 out = util.stringio()
158
159
159 try:
160 try:
160 self._ui.fout = out
161 self._ui.fout = out
161 self._ui.ferr = out
162 self._ui.ferr = out
162 yield out
163 yield out
163 finally:
164 finally:
164 self._ui.fout = oldout
165 self._ui.fout = oldout
165 self._ui.ferr = olderr
166 self._ui.ferr = olderr
166
167
167 def client(self):
168 def client(self):
168 return 'remote:%s:%s:%s' % (
169 return 'remote:%s:%s:%s' % (
169 self._req.env.get('wsgi.url_scheme') or 'http',
170 self._req.env.get('wsgi.url_scheme') or 'http',
170 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
171 urlreq.quote(self._req.env.get('REMOTE_HOST', '')),
171 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
172 urlreq.quote(self._req.env.get('REMOTE_USER', '')))
172
173
173 def iscmd(cmd):
174 def iscmd(cmd):
174 return cmd in wireproto.commands
175 return cmd in wireproto.commands
175
176
176 def parsehttprequest(repo, req, query):
177 def parsehttprequest(repo, req, query):
177 """Parse the HTTP request for a wire protocol request.
178 """Parse the HTTP request for a wire protocol request.
178
179
179 If the current request appears to be a wire protocol request, this
180 If the current request appears to be a wire protocol request, this
180 function returns a dict with details about that request, including
181 function returns a dict with details about that request, including
181 an ``abstractprotocolserver`` instance suitable for handling the
182 an ``abstractprotocolserver`` instance suitable for handling the
182 request. Otherwise, ``None`` is returned.
183 request. Otherwise, ``None`` is returned.
183
184
184 ``req`` is a ``wsgirequest`` instance.
185 ``req`` is a ``wsgirequest`` instance.
185 """
186 """
186 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
187 # HTTP version 1 wire protocol requests are denoted by a "cmd" query
187 # string parameter. If it isn't present, this isn't a wire protocol
188 # string parameter. If it isn't present, this isn't a wire protocol
188 # request.
189 # request.
189 if r'cmd' not in req.form:
190 if r'cmd' not in req.form:
190 return None
191 return None
191
192
192 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
193 cmd = pycompat.sysbytes(req.form[r'cmd'][0])
193
194
194 # The "cmd" request parameter is used by both the wire protocol and hgweb.
195 # The "cmd" request parameter is used by both the wire protocol and hgweb.
195 # While not all wire protocol commands are available for all transports,
196 # While not all wire protocol commands are available for all transports,
196 # if we see a "cmd" value that resembles a known wire protocol command, we
197 # if we see a "cmd" value that resembles a known wire protocol command, we
197 # route it to a protocol handler. This is better than routing possible
198 # route it to a protocol handler. This is better than routing possible
198 # wire protocol requests to hgweb because it prevents hgweb from using
199 # wire protocol requests to hgweb because it prevents hgweb from using
199 # known wire protocol commands and it is less confusing for machine
200 # known wire protocol commands and it is less confusing for machine
200 # clients.
201 # clients.
201 if cmd not in wireproto.commands:
202 if cmd not in wireproto.commands:
202 return None
203 return None
203
204
204 proto = webproto(req, repo.ui)
205 proto = webproto(req, repo.ui)
205
206
206 return {
207 return {
207 'cmd': cmd,
208 'cmd': cmd,
208 'proto': proto,
209 'proto': proto,
209 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
210 'dispatch': lambda: _callhttp(repo, req, proto, cmd),
210 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
211 'handleerror': lambda ex: _handlehttperror(ex, req, cmd),
211 }
212 }
212
213
213 def _httpresponsetype(ui, req, prefer_uncompressed):
214 def _httpresponsetype(ui, req, prefer_uncompressed):
214 """Determine the appropriate response type and compression settings.
215 """Determine the appropriate response type and compression settings.
215
216
216 Returns a tuple of (mediatype, compengine, engineopts).
217 Returns a tuple of (mediatype, compengine, engineopts).
217 """
218 """
218 # Determine the response media type and compression engine based
219 # Determine the response media type and compression engine based
219 # on the request parameters.
220 # on the request parameters.
220 protocaps = decodevaluefromheaders(req, r'X-HgProto').split(' ')
221 protocaps = decodevaluefromheaders(req, r'X-HgProto').split(' ')
221
222
222 if '0.2' in protocaps:
223 if '0.2' in protocaps:
223 # All clients are expected to support uncompressed data.
224 # All clients are expected to support uncompressed data.
224 if prefer_uncompressed:
225 if prefer_uncompressed:
225 return HGTYPE2, util._noopengine(), {}
226 return HGTYPE2, util._noopengine(), {}
226
227
227 # Default as defined by wire protocol spec.
228 # Default as defined by wire protocol spec.
228 compformats = ['zlib', 'none']
229 compformats = ['zlib', 'none']
229 for cap in protocaps:
230 for cap in protocaps:
230 if cap.startswith('comp='):
231 if cap.startswith('comp='):
231 compformats = cap[5:].split(',')
232 compformats = cap[5:].split(',')
232 break
233 break
233
234
234 # Now find an agreed upon compression format.
235 # Now find an agreed upon compression format.
235 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
236 for engine in wireproto.supportedcompengines(ui, util.SERVERROLE):
236 if engine.wireprotosupport().name in compformats:
237 if engine.wireprotosupport().name in compformats:
237 opts = {}
238 opts = {}
238 level = ui.configint('server', '%slevel' % engine.name())
239 level = ui.configint('server', '%slevel' % engine.name())
239 if level is not None:
240 if level is not None:
240 opts['level'] = level
241 opts['level'] = level
241
242
242 return HGTYPE2, engine, opts
243 return HGTYPE2, engine, opts
243
244
244 # No mutually supported compression format. Fall back to the
245 # No mutually supported compression format. Fall back to the
245 # legacy protocol.
246 # legacy protocol.
246
247
247 # Don't allow untrusted settings because disabling compression or
248 # Don't allow untrusted settings because disabling compression or
248 # setting a very high compression level could lead to flooding
249 # setting a very high compression level could lead to flooding
249 # the server's network or CPU.
250 # the server's network or CPU.
250 opts = {'level': ui.configint('server', 'zliblevel')}
251 opts = {'level': ui.configint('server', 'zliblevel')}
251 return HGTYPE, util.compengines['zlib'], opts
252 return HGTYPE, util.compengines['zlib'], opts
252
253
253 def _callhttp(repo, req, proto, cmd):
254 def _callhttp(repo, req, proto, cmd):
254 def genversion2(gen, engine, engineopts):
255 def genversion2(gen, engine, engineopts):
255 # application/mercurial-0.2 always sends a payload header
256 # application/mercurial-0.2 always sends a payload header
256 # identifying the compression engine.
257 # identifying the compression engine.
257 name = engine.wireprotosupport().name
258 name = engine.wireprotosupport().name
258 assert 0 < len(name) < 256
259 assert 0 < len(name) < 256
259 yield struct.pack('B', len(name))
260 yield struct.pack('B', len(name))
260 yield name
261 yield name
261
262
262 for chunk in gen:
263 for chunk in gen:
263 yield chunk
264 yield chunk
264
265
265 rsp = wireproto.dispatch(repo, proto, cmd)
266 rsp = wireproto.dispatch(repo, proto, cmd)
266
267
267 if not wireproto.commands.commandavailable(cmd, proto):
268 if not wireproto.commands.commandavailable(cmd, proto):
268 req.respond(HTTP_OK, HGERRTYPE,
269 req.respond(HTTP_OK, HGERRTYPE,
269 body=_('requested wire protocol command is not available '
270 body=_('requested wire protocol command is not available '
270 'over HTTP'))
271 'over HTTP'))
271 return []
272 return []
272
273
273 if isinstance(rsp, bytes):
274 if isinstance(rsp, bytes):
274 req.respond(HTTP_OK, HGTYPE, body=rsp)
275 req.respond(HTTP_OK, HGTYPE, body=rsp)
275 return []
276 return []
276 elif isinstance(rsp, wireproto.streamres_legacy):
277 elif isinstance(rsp, wireprototypes.streamreslegacy):
277 gen = rsp.gen
278 gen = rsp.gen
278 req.respond(HTTP_OK, HGTYPE)
279 req.respond(HTTP_OK, HGTYPE)
279 return gen
280 return gen
280 elif isinstance(rsp, wireproto.streamres):
281 elif isinstance(rsp, wireprototypes.streamres):
281 gen = rsp.gen
282 gen = rsp.gen
282
283
283 # This code for compression should not be streamres specific. It
284 # This code for compression should not be streamres specific. It
284 # is here because we only compress streamres at the moment.
285 # is here because we only compress streamres at the moment.
285 mediatype, engine, engineopts = _httpresponsetype(
286 mediatype, engine, engineopts = _httpresponsetype(
286 repo.ui, req, rsp.prefer_uncompressed)
287 repo.ui, req, rsp.prefer_uncompressed)
287 gen = engine.compressstream(gen, engineopts)
288 gen = engine.compressstream(gen, engineopts)
288
289
289 if mediatype == HGTYPE2:
290 if mediatype == HGTYPE2:
290 gen = genversion2(gen, engine, engineopts)
291 gen = genversion2(gen, engine, engineopts)
291
292
292 req.respond(HTTP_OK, mediatype)
293 req.respond(HTTP_OK, mediatype)
293 return gen
294 return gen
294 elif isinstance(rsp, wireproto.pushres):
295 elif isinstance(rsp, wireprototypes.pushres):
295 rsp = '%d\n%s' % (rsp.res, rsp.output)
296 rsp = '%d\n%s' % (rsp.res, rsp.output)
296 req.respond(HTTP_OK, HGTYPE, body=rsp)
297 req.respond(HTTP_OK, HGTYPE, body=rsp)
297 return []
298 return []
298 elif isinstance(rsp, wireproto.pusherr):
299 elif isinstance(rsp, wireprototypes.pusherr):
299 # This is the httplib workaround documented in _handlehttperror().
300 # This is the httplib workaround documented in _handlehttperror().
300 req.drain()
301 req.drain()
301
302
302 rsp = '0\n%s\n' % rsp.res
303 rsp = '0\n%s\n' % rsp.res
303 req.respond(HTTP_OK, HGTYPE, body=rsp)
304 req.respond(HTTP_OK, HGTYPE, body=rsp)
304 return []
305 return []
305 elif isinstance(rsp, wireproto.ooberror):
306 elif isinstance(rsp, wireprototypes.ooberror):
306 rsp = rsp.message
307 rsp = rsp.message
307 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
308 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
308 return []
309 return []
309 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
310 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
310
311
311 def _handlehttperror(e, req, cmd):
312 def _handlehttperror(e, req, cmd):
312 """Called when an ErrorResponse is raised during HTTP request processing."""
313 """Called when an ErrorResponse is raised during HTTP request processing."""
313
314
314 # Clients using Python's httplib are stateful: the HTTP client
315 # Clients using Python's httplib are stateful: the HTTP client
315 # won't process an HTTP response until all request data is
316 # won't process an HTTP response until all request data is
316 # sent to the server. The intent of this code is to ensure
317 # sent to the server. The intent of this code is to ensure
317 # we always read HTTP request data from the client, thus
318 # we always read HTTP request data from the client, thus
318 # ensuring httplib transitions to a state that allows it to read
319 # ensuring httplib transitions to a state that allows it to read
319 # the HTTP response. In other words, it helps prevent deadlocks
320 # the HTTP response. In other words, it helps prevent deadlocks
320 # on clients using httplib.
321 # on clients using httplib.
321
322
322 if (req.env[r'REQUEST_METHOD'] == r'POST' and
323 if (req.env[r'REQUEST_METHOD'] == r'POST' and
323 # But not if Expect: 100-continue is being used.
324 # But not if Expect: 100-continue is being used.
324 (req.env.get('HTTP_EXPECT',
325 (req.env.get('HTTP_EXPECT',
325 '').lower() != '100-continue') or
326 '').lower() != '100-continue') or
326 # Or the non-httplib HTTP library is being advertised by
327 # Or the non-httplib HTTP library is being advertised by
327 # the client.
328 # the client.
328 req.env.get('X-HgHttp2', '')):
329 req.env.get('X-HgHttp2', '')):
329 req.drain()
330 req.drain()
330 else:
331 else:
331 req.headers.append((r'Connection', r'Close'))
332 req.headers.append((r'Connection', r'Close'))
332
333
333 # TODO This response body assumes the failed command was
334 # TODO This response body assumes the failed command was
334 # "unbundle." That assumption is not always valid.
335 # "unbundle." That assumption is not always valid.
335 req.respond(e, HGTYPE, body='0\n%s\n' % e)
336 req.respond(e, HGTYPE, body='0\n%s\n' % e)
336
337
337 return ''
338 return ''
338
339
339 def _sshv1respondbytes(fout, value):
340 def _sshv1respondbytes(fout, value):
340 """Send a bytes response for protocol version 1."""
341 """Send a bytes response for protocol version 1."""
341 fout.write('%d\n' % len(value))
342 fout.write('%d\n' % len(value))
342 fout.write(value)
343 fout.write(value)
343 fout.flush()
344 fout.flush()
344
345
345 def _sshv1respondstream(fout, source):
346 def _sshv1respondstream(fout, source):
346 write = fout.write
347 write = fout.write
347 for chunk in source.gen:
348 for chunk in source.gen:
348 write(chunk)
349 write(chunk)
349 fout.flush()
350 fout.flush()
350
351
351 def _sshv1respondooberror(fout, ferr, rsp):
352 def _sshv1respondooberror(fout, ferr, rsp):
352 ferr.write(b'%s\n-\n' % rsp)
353 ferr.write(b'%s\n-\n' % rsp)
353 ferr.flush()
354 ferr.flush()
354 fout.write(b'\n')
355 fout.write(b'\n')
355 fout.flush()
356 fout.flush()
356
357
357 class sshv1protocolhandler(baseprotocolhandler):
358 class sshv1protocolhandler(baseprotocolhandler):
358 """Handler for requests services via version 1 of SSH protocol."""
359 """Handler for requests services via version 1 of SSH protocol."""
359 def __init__(self, ui, fin, fout):
360 def __init__(self, ui, fin, fout):
360 self._ui = ui
361 self._ui = ui
361 self._fin = fin
362 self._fin = fin
362 self._fout = fout
363 self._fout = fout
363
364
364 @property
365 @property
365 def name(self):
366 def name(self):
366 return 'ssh'
367 return 'ssh'
367
368
368 def getargs(self, args):
369 def getargs(self, args):
369 data = {}
370 data = {}
370 keys = args.split()
371 keys = args.split()
371 for n in xrange(len(keys)):
372 for n in xrange(len(keys)):
372 argline = self._fin.readline()[:-1]
373 argline = self._fin.readline()[:-1]
373 arg, l = argline.split()
374 arg, l = argline.split()
374 if arg not in keys:
375 if arg not in keys:
375 raise error.Abort(_("unexpected parameter %r") % arg)
376 raise error.Abort(_("unexpected parameter %r") % arg)
376 if arg == '*':
377 if arg == '*':
377 star = {}
378 star = {}
378 for k in xrange(int(l)):
379 for k in xrange(int(l)):
379 argline = self._fin.readline()[:-1]
380 argline = self._fin.readline()[:-1]
380 arg, l = argline.split()
381 arg, l = argline.split()
381 val = self._fin.read(int(l))
382 val = self._fin.read(int(l))
382 star[arg] = val
383 star[arg] = val
383 data['*'] = star
384 data['*'] = star
384 else:
385 else:
385 val = self._fin.read(int(l))
386 val = self._fin.read(int(l))
386 data[arg] = val
387 data[arg] = val
387 return [data[k] for k in keys]
388 return [data[k] for k in keys]
388
389
389 def forwardpayload(self, fpout):
390 def forwardpayload(self, fpout):
390 # The file is in the form:
391 # The file is in the form:
391 #
392 #
392 # <chunk size>\n<chunk>
393 # <chunk size>\n<chunk>
393 # ...
394 # ...
394 # 0\n
395 # 0\n
395 _sshv1respondbytes(self._fout, b'')
396 _sshv1respondbytes(self._fout, b'')
396 count = int(self._fin.readline())
397 count = int(self._fin.readline())
397 while count:
398 while count:
398 fpout.write(self._fin.read(count))
399 fpout.write(self._fin.read(count))
399 count = int(self._fin.readline())
400 count = int(self._fin.readline())
400
401
401 @contextlib.contextmanager
402 @contextlib.contextmanager
402 def mayberedirectstdio(self):
403 def mayberedirectstdio(self):
403 yield None
404 yield None
404
405
405 def client(self):
406 def client(self):
406 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
407 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
407 return 'remote:ssh:' + client
408 return 'remote:ssh:' + client
408
409
409 class sshserver(object):
410 class sshserver(object):
410 def __init__(self, ui, repo):
411 def __init__(self, ui, repo):
411 self._ui = ui
412 self._ui = ui
412 self._repo = repo
413 self._repo = repo
413 self._fin = ui.fin
414 self._fin = ui.fin
414 self._fout = ui.fout
415 self._fout = ui.fout
415
416
416 hook.redirect(True)
417 hook.redirect(True)
417 ui.fout = repo.ui.fout = ui.ferr
418 ui.fout = repo.ui.fout = ui.ferr
418
419
419 # Prevent insertion/deletion of CRs
420 # Prevent insertion/deletion of CRs
420 util.setbinary(self._fin)
421 util.setbinary(self._fin)
421 util.setbinary(self._fout)
422 util.setbinary(self._fout)
422
423
423 self._proto = sshv1protocolhandler(self._ui, self._fin, self._fout)
424 self._proto = sshv1protocolhandler(self._ui, self._fin, self._fout)
424
425
425 def serve_forever(self):
426 def serve_forever(self):
426 while self.serve_one():
427 while self.serve_one():
427 pass
428 pass
428 sys.exit(0)
429 sys.exit(0)
429
430
430 def serve_one(self):
431 def serve_one(self):
431 cmd = self._fin.readline()[:-1]
432 cmd = self._fin.readline()[:-1]
432 if cmd and wireproto.commands.commandavailable(cmd, self._proto):
433 if cmd and wireproto.commands.commandavailable(cmd, self._proto):
433 rsp = wireproto.dispatch(self._repo, self._proto, cmd)
434 rsp = wireproto.dispatch(self._repo, self._proto, cmd)
434
435
435 if isinstance(rsp, bytes):
436 if isinstance(rsp, bytes):
436 _sshv1respondbytes(self._fout, rsp)
437 _sshv1respondbytes(self._fout, rsp)
437 elif isinstance(rsp, wireproto.streamres):
438 elif isinstance(rsp, wireprototypes.streamres):
438 _sshv1respondstream(self._fout, rsp)
439 _sshv1respondstream(self._fout, rsp)
439 elif isinstance(rsp, wireproto.streamres_legacy):
440 elif isinstance(rsp, wireprototypes.streamreslegacy):
440 _sshv1respondstream(self._fout, rsp)
441 _sshv1respondstream(self._fout, rsp)
441 elif isinstance(rsp, wireproto.pushres):
442 elif isinstance(rsp, wireprototypes.pushres):
442 _sshv1respondbytes(self._fout, b'')
443 _sshv1respondbytes(self._fout, b'')
443 _sshv1respondbytes(self._fout, bytes(rsp.res))
444 _sshv1respondbytes(self._fout, bytes(rsp.res))
444 elif isinstance(rsp, wireproto.pusherr):
445 elif isinstance(rsp, wireprototypes.pusherr):
445 _sshv1respondbytes(self._fout, rsp.res)
446 _sshv1respondbytes(self._fout, rsp.res)
446 elif isinstance(rsp, wireproto.ooberror):
447 elif isinstance(rsp, wireprototypes.ooberror):
447 _sshv1respondooberror(self._fout, self._ui.ferr, rsp.message)
448 _sshv1respondooberror(self._fout, self._ui.ferr, rsp.message)
448 else:
449 else:
449 raise error.ProgrammingError('unhandled response type from '
450 raise error.ProgrammingError('unhandled response type from '
450 'wire protocol command: %s' % rsp)
451 'wire protocol command: %s' % rsp)
451 elif cmd:
452 elif cmd:
452 _sshv1respondbytes(self._fout, b'')
453 _sshv1respondbytes(self._fout, b'')
453 return cmd != ''
454 return cmd != ''
General Comments 0
You need to be logged in to leave comments. Login now