##// END OF EJS Templates
wireprotoserver: move abstractserverproto class from wireproto...
Gregory Szorc -
r35878:d9e71cce default
parent child Browse files
Show More
@@ -1,1076 +1,1036 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import os
11 import os
12 import tempfile
12 import tempfile
13
13
14 from .i18n import _
14 from .i18n import _
15 from .node import (
15 from .node import (
16 bin,
16 bin,
17 hex,
17 hex,
18 nullid,
18 nullid,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 discovery,
24 discovery,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 repository,
31 repository,
32 streamclone,
32 streamclone,
33 util,
33 util,
34 )
34 )
35
35
36 urlerr = util.urlerr
36 urlerr = util.urlerr
37 urlreq = util.urlreq
37 urlreq = util.urlreq
38
38
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
41 'IncompatibleClient')
41 'IncompatibleClient')
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
43
43
44 class abstractserverproto(object):
45 """abstract class that summarizes the protocol API
46
47 Used as reference and documentation.
48 """
49
50 def getargs(self, args):
51 """return the value for arguments in <args>
52
53 returns a list of values (same order as <args>)"""
54 raise NotImplementedError()
55
56 def getfile(self, fp):
57 """write the whole content of a file into a file like object
58
59 The file is in the form::
60
61 (<chunk-size>\n<chunk>)+0\n
62
63 chunk size is the ascii version of the int.
64 """
65 raise NotImplementedError()
66
67 def redirect(self):
68 """may setup interception for stdout and stderr
69
70 See also the `restore` method."""
71 raise NotImplementedError()
72
73 # If the `redirect` function does install interception, the `restore`
74 # function MUST be defined. If interception is not used, this function
75 # MUST NOT be defined.
76 #
77 # left commented here on purpose
78 #
79 #def restore(self):
80 # """reinstall previous stdout and stderr and return intercepted stdout
81 # """
82 # raise NotImplementedError()
83
84 class remoteiterbatcher(peer.iterbatcher):
44 class remoteiterbatcher(peer.iterbatcher):
85 def __init__(self, remote):
45 def __init__(self, remote):
86 super(remoteiterbatcher, self).__init__()
46 super(remoteiterbatcher, self).__init__()
87 self._remote = remote
47 self._remote = remote
88
48
89 def __getattr__(self, name):
49 def __getattr__(self, name):
90 # Validate this method is batchable, since submit() only supports
50 # Validate this method is batchable, since submit() only supports
91 # batchable methods.
51 # batchable methods.
92 fn = getattr(self._remote, name)
52 fn = getattr(self._remote, name)
93 if not getattr(fn, 'batchable', None):
53 if not getattr(fn, 'batchable', None):
94 raise error.ProgrammingError('Attempted to batch a non-batchable '
54 raise error.ProgrammingError('Attempted to batch a non-batchable '
95 'call to %r' % name)
55 'call to %r' % name)
96
56
97 return super(remoteiterbatcher, self).__getattr__(name)
57 return super(remoteiterbatcher, self).__getattr__(name)
98
58
99 def submit(self):
59 def submit(self):
100 """Break the batch request into many patch calls and pipeline them.
60 """Break the batch request into many patch calls and pipeline them.
101
61
102 This is mostly valuable over http where request sizes can be
62 This is mostly valuable over http where request sizes can be
103 limited, but can be used in other places as well.
63 limited, but can be used in other places as well.
104 """
64 """
105 # 2-tuple of (command, arguments) that represents what will be
65 # 2-tuple of (command, arguments) that represents what will be
106 # sent over the wire.
66 # sent over the wire.
107 requests = []
67 requests = []
108
68
109 # 4-tuple of (command, final future, @batchable generator, remote
69 # 4-tuple of (command, final future, @batchable generator, remote
110 # future).
70 # future).
111 results = []
71 results = []
112
72
113 for command, args, opts, finalfuture in self.calls:
73 for command, args, opts, finalfuture in self.calls:
114 mtd = getattr(self._remote, command)
74 mtd = getattr(self._remote, command)
115 batchable = mtd.batchable(mtd.__self__, *args, **opts)
75 batchable = mtd.batchable(mtd.__self__, *args, **opts)
116
76
117 commandargs, fremote = next(batchable)
77 commandargs, fremote = next(batchable)
118 assert fremote
78 assert fremote
119 requests.append((command, commandargs))
79 requests.append((command, commandargs))
120 results.append((command, finalfuture, batchable, fremote))
80 results.append((command, finalfuture, batchable, fremote))
121
81
122 if requests:
82 if requests:
123 self._resultiter = self._remote._submitbatch(requests)
83 self._resultiter = self._remote._submitbatch(requests)
124
84
125 self._results = results
85 self._results = results
126
86
127 def results(self):
87 def results(self):
128 for command, finalfuture, batchable, remotefuture in self._results:
88 for command, finalfuture, batchable, remotefuture in self._results:
129 # Get the raw result, set it in the remote future, feed it
89 # Get the raw result, set it in the remote future, feed it
130 # back into the @batchable generator so it can be decoded, and
90 # back into the @batchable generator so it can be decoded, and
131 # set the result on the final future to this value.
91 # set the result on the final future to this value.
132 remoteresult = next(self._resultiter)
92 remoteresult = next(self._resultiter)
133 remotefuture.set(remoteresult)
93 remotefuture.set(remoteresult)
134 finalfuture.set(next(batchable))
94 finalfuture.set(next(batchable))
135
95
136 # Verify our @batchable generators only emit 2 values.
96 # Verify our @batchable generators only emit 2 values.
137 try:
97 try:
138 next(batchable)
98 next(batchable)
139 except StopIteration:
99 except StopIteration:
140 pass
100 pass
141 else:
101 else:
142 raise error.ProgrammingError('%s @batchable generator emitted '
102 raise error.ProgrammingError('%s @batchable generator emitted '
143 'unexpected value count' % command)
103 'unexpected value count' % command)
144
104
145 yield finalfuture.value
105 yield finalfuture.value
146
106
147 # Forward a couple of names from peer to make wireproto interactions
107 # Forward a couple of names from peer to make wireproto interactions
148 # slightly more sensible.
108 # slightly more sensible.
149 batchable = peer.batchable
109 batchable = peer.batchable
150 future = peer.future
110 future = peer.future
151
111
152 # list of nodes encoding / decoding
112 # list of nodes encoding / decoding
153
113
154 def decodelist(l, sep=' '):
114 def decodelist(l, sep=' '):
155 if l:
115 if l:
156 return [bin(v) for v in l.split(sep)]
116 return [bin(v) for v in l.split(sep)]
157 return []
117 return []
158
118
159 def encodelist(l, sep=' '):
119 def encodelist(l, sep=' '):
160 try:
120 try:
161 return sep.join(map(hex, l))
121 return sep.join(map(hex, l))
162 except TypeError:
122 except TypeError:
163 raise
123 raise
164
124
165 # batched call argument encoding
125 # batched call argument encoding
166
126
167 def escapearg(plain):
127 def escapearg(plain):
168 return (plain
128 return (plain
169 .replace(':', ':c')
129 .replace(':', ':c')
170 .replace(',', ':o')
130 .replace(',', ':o')
171 .replace(';', ':s')
131 .replace(';', ':s')
172 .replace('=', ':e'))
132 .replace('=', ':e'))
173
133
174 def unescapearg(escaped):
134 def unescapearg(escaped):
175 return (escaped
135 return (escaped
176 .replace(':e', '=')
136 .replace(':e', '=')
177 .replace(':s', ';')
137 .replace(':s', ';')
178 .replace(':o', ',')
138 .replace(':o', ',')
179 .replace(':c', ':'))
139 .replace(':c', ':'))
180
140
181 def encodebatchcmds(req):
141 def encodebatchcmds(req):
182 """Return a ``cmds`` argument value for the ``batch`` command."""
142 """Return a ``cmds`` argument value for the ``batch`` command."""
183 cmds = []
143 cmds = []
184 for op, argsdict in req:
144 for op, argsdict in req:
185 # Old servers didn't properly unescape argument names. So prevent
145 # Old servers didn't properly unescape argument names. So prevent
186 # the sending of argument names that may not be decoded properly by
146 # the sending of argument names that may not be decoded properly by
187 # servers.
147 # servers.
188 assert all(escapearg(k) == k for k in argsdict)
148 assert all(escapearg(k) == k for k in argsdict)
189
149
190 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
150 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
191 for k, v in argsdict.iteritems())
151 for k, v in argsdict.iteritems())
192 cmds.append('%s %s' % (op, args))
152 cmds.append('%s %s' % (op, args))
193
153
194 return ';'.join(cmds)
154 return ';'.join(cmds)
195
155
196 # mapping of options accepted by getbundle and their types
156 # mapping of options accepted by getbundle and their types
197 #
157 #
198 # Meant to be extended by extensions. It is extensions responsibility to ensure
158 # Meant to be extended by extensions. It is extensions responsibility to ensure
199 # such options are properly processed in exchange.getbundle.
159 # such options are properly processed in exchange.getbundle.
200 #
160 #
201 # supported types are:
161 # supported types are:
202 #
162 #
203 # :nodes: list of binary nodes
163 # :nodes: list of binary nodes
204 # :csv: list of comma-separated values
164 # :csv: list of comma-separated values
205 # :scsv: list of comma-separated values return as set
165 # :scsv: list of comma-separated values return as set
206 # :plain: string with no transformation needed.
166 # :plain: string with no transformation needed.
207 gboptsmap = {'heads': 'nodes',
167 gboptsmap = {'heads': 'nodes',
208 'bookmarks': 'boolean',
168 'bookmarks': 'boolean',
209 'common': 'nodes',
169 'common': 'nodes',
210 'obsmarkers': 'boolean',
170 'obsmarkers': 'boolean',
211 'phases': 'boolean',
171 'phases': 'boolean',
212 'bundlecaps': 'scsv',
172 'bundlecaps': 'scsv',
213 'listkeys': 'csv',
173 'listkeys': 'csv',
214 'cg': 'boolean',
174 'cg': 'boolean',
215 'cbattempted': 'boolean',
175 'cbattempted': 'boolean',
216 'stream': 'boolean',
176 'stream': 'boolean',
217 }
177 }
218
178
219 # client side
179 # client side
220
180
221 class wirepeer(repository.legacypeer):
181 class wirepeer(repository.legacypeer):
222 """Client-side interface for communicating with a peer repository.
182 """Client-side interface for communicating with a peer repository.
223
183
224 Methods commonly call wire protocol commands of the same name.
184 Methods commonly call wire protocol commands of the same name.
225
185
226 See also httppeer.py and sshpeer.py for protocol-specific
186 See also httppeer.py and sshpeer.py for protocol-specific
227 implementations of this interface.
187 implementations of this interface.
228 """
188 """
229 # Begin of basewirepeer interface.
189 # Begin of basewirepeer interface.
230
190
231 def iterbatch(self):
191 def iterbatch(self):
232 return remoteiterbatcher(self)
192 return remoteiterbatcher(self)
233
193
234 @batchable
194 @batchable
235 def lookup(self, key):
195 def lookup(self, key):
236 self.requirecap('lookup', _('look up remote revision'))
196 self.requirecap('lookup', _('look up remote revision'))
237 f = future()
197 f = future()
238 yield {'key': encoding.fromlocal(key)}, f
198 yield {'key': encoding.fromlocal(key)}, f
239 d = f.value
199 d = f.value
240 success, data = d[:-1].split(" ", 1)
200 success, data = d[:-1].split(" ", 1)
241 if int(success):
201 if int(success):
242 yield bin(data)
202 yield bin(data)
243 else:
203 else:
244 self._abort(error.RepoError(data))
204 self._abort(error.RepoError(data))
245
205
246 @batchable
206 @batchable
247 def heads(self):
207 def heads(self):
248 f = future()
208 f = future()
249 yield {}, f
209 yield {}, f
250 d = f.value
210 d = f.value
251 try:
211 try:
252 yield decodelist(d[:-1])
212 yield decodelist(d[:-1])
253 except ValueError:
213 except ValueError:
254 self._abort(error.ResponseError(_("unexpected response:"), d))
214 self._abort(error.ResponseError(_("unexpected response:"), d))
255
215
256 @batchable
216 @batchable
257 def known(self, nodes):
217 def known(self, nodes):
258 f = future()
218 f = future()
259 yield {'nodes': encodelist(nodes)}, f
219 yield {'nodes': encodelist(nodes)}, f
260 d = f.value
220 d = f.value
261 try:
221 try:
262 yield [bool(int(b)) for b in d]
222 yield [bool(int(b)) for b in d]
263 except ValueError:
223 except ValueError:
264 self._abort(error.ResponseError(_("unexpected response:"), d))
224 self._abort(error.ResponseError(_("unexpected response:"), d))
265
225
266 @batchable
226 @batchable
267 def branchmap(self):
227 def branchmap(self):
268 f = future()
228 f = future()
269 yield {}, f
229 yield {}, f
270 d = f.value
230 d = f.value
271 try:
231 try:
272 branchmap = {}
232 branchmap = {}
273 for branchpart in d.splitlines():
233 for branchpart in d.splitlines():
274 branchname, branchheads = branchpart.split(' ', 1)
234 branchname, branchheads = branchpart.split(' ', 1)
275 branchname = encoding.tolocal(urlreq.unquote(branchname))
235 branchname = encoding.tolocal(urlreq.unquote(branchname))
276 branchheads = decodelist(branchheads)
236 branchheads = decodelist(branchheads)
277 branchmap[branchname] = branchheads
237 branchmap[branchname] = branchheads
278 yield branchmap
238 yield branchmap
279 except TypeError:
239 except TypeError:
280 self._abort(error.ResponseError(_("unexpected response:"), d))
240 self._abort(error.ResponseError(_("unexpected response:"), d))
281
241
282 @batchable
242 @batchable
283 def listkeys(self, namespace):
243 def listkeys(self, namespace):
284 if not self.capable('pushkey'):
244 if not self.capable('pushkey'):
285 yield {}, None
245 yield {}, None
286 f = future()
246 f = future()
287 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
247 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
288 yield {'namespace': encoding.fromlocal(namespace)}, f
248 yield {'namespace': encoding.fromlocal(namespace)}, f
289 d = f.value
249 d = f.value
290 self.ui.debug('received listkey for "%s": %i bytes\n'
250 self.ui.debug('received listkey for "%s": %i bytes\n'
291 % (namespace, len(d)))
251 % (namespace, len(d)))
292 yield pushkeymod.decodekeys(d)
252 yield pushkeymod.decodekeys(d)
293
253
294 @batchable
254 @batchable
295 def pushkey(self, namespace, key, old, new):
255 def pushkey(self, namespace, key, old, new):
296 if not self.capable('pushkey'):
256 if not self.capable('pushkey'):
297 yield False, None
257 yield False, None
298 f = future()
258 f = future()
299 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
300 yield {'namespace': encoding.fromlocal(namespace),
260 yield {'namespace': encoding.fromlocal(namespace),
301 'key': encoding.fromlocal(key),
261 'key': encoding.fromlocal(key),
302 'old': encoding.fromlocal(old),
262 'old': encoding.fromlocal(old),
303 'new': encoding.fromlocal(new)}, f
263 'new': encoding.fromlocal(new)}, f
304 d = f.value
264 d = f.value
305 d, output = d.split('\n', 1)
265 d, output = d.split('\n', 1)
306 try:
266 try:
307 d = bool(int(d))
267 d = bool(int(d))
308 except ValueError:
268 except ValueError:
309 raise error.ResponseError(
269 raise error.ResponseError(
310 _('push failed (unexpected response):'), d)
270 _('push failed (unexpected response):'), d)
311 for l in output.splitlines(True):
271 for l in output.splitlines(True):
312 self.ui.status(_('remote: '), l)
272 self.ui.status(_('remote: '), l)
313 yield d
273 yield d
314
274
315 def stream_out(self):
275 def stream_out(self):
316 return self._callstream('stream_out')
276 return self._callstream('stream_out')
317
277
318 def getbundle(self, source, **kwargs):
278 def getbundle(self, source, **kwargs):
319 kwargs = pycompat.byteskwargs(kwargs)
279 kwargs = pycompat.byteskwargs(kwargs)
320 self.requirecap('getbundle', _('look up remote changes'))
280 self.requirecap('getbundle', _('look up remote changes'))
321 opts = {}
281 opts = {}
322 bundlecaps = kwargs.get('bundlecaps')
282 bundlecaps = kwargs.get('bundlecaps')
323 if bundlecaps is not None:
283 if bundlecaps is not None:
324 kwargs['bundlecaps'] = sorted(bundlecaps)
284 kwargs['bundlecaps'] = sorted(bundlecaps)
325 else:
285 else:
326 bundlecaps = () # kwargs could have it to None
286 bundlecaps = () # kwargs could have it to None
327 for key, value in kwargs.iteritems():
287 for key, value in kwargs.iteritems():
328 if value is None:
288 if value is None:
329 continue
289 continue
330 keytype = gboptsmap.get(key)
290 keytype = gboptsmap.get(key)
331 if keytype is None:
291 if keytype is None:
332 raise error.ProgrammingError(
292 raise error.ProgrammingError(
333 'Unexpectedly None keytype for key %s' % key)
293 'Unexpectedly None keytype for key %s' % key)
334 elif keytype == 'nodes':
294 elif keytype == 'nodes':
335 value = encodelist(value)
295 value = encodelist(value)
336 elif keytype in ('csv', 'scsv'):
296 elif keytype in ('csv', 'scsv'):
337 value = ','.join(value)
297 value = ','.join(value)
338 elif keytype == 'boolean':
298 elif keytype == 'boolean':
339 value = '%i' % bool(value)
299 value = '%i' % bool(value)
340 elif keytype != 'plain':
300 elif keytype != 'plain':
341 raise KeyError('unknown getbundle option type %s'
301 raise KeyError('unknown getbundle option type %s'
342 % keytype)
302 % keytype)
343 opts[key] = value
303 opts[key] = value
344 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
304 f = self._callcompressable("getbundle", **pycompat.strkwargs(opts))
345 if any((cap.startswith('HG2') for cap in bundlecaps)):
305 if any((cap.startswith('HG2') for cap in bundlecaps)):
346 return bundle2.getunbundler(self.ui, f)
306 return bundle2.getunbundler(self.ui, f)
347 else:
307 else:
348 return changegroupmod.cg1unpacker(f, 'UN')
308 return changegroupmod.cg1unpacker(f, 'UN')
349
309
350 def unbundle(self, cg, heads, url):
310 def unbundle(self, cg, heads, url):
351 '''Send cg (a readable file-like object representing the
311 '''Send cg (a readable file-like object representing the
352 changegroup to push, typically a chunkbuffer object) to the
312 changegroup to push, typically a chunkbuffer object) to the
353 remote server as a bundle.
313 remote server as a bundle.
354
314
355 When pushing a bundle10 stream, return an integer indicating the
315 When pushing a bundle10 stream, return an integer indicating the
356 result of the push (see changegroup.apply()).
316 result of the push (see changegroup.apply()).
357
317
358 When pushing a bundle20 stream, return a bundle20 stream.
318 When pushing a bundle20 stream, return a bundle20 stream.
359
319
360 `url` is the url the client thinks it's pushing to, which is
320 `url` is the url the client thinks it's pushing to, which is
361 visible to hooks.
321 visible to hooks.
362 '''
322 '''
363
323
364 if heads != ['force'] and self.capable('unbundlehash'):
324 if heads != ['force'] and self.capable('unbundlehash'):
365 heads = encodelist(['hashed',
325 heads = encodelist(['hashed',
366 hashlib.sha1(''.join(sorted(heads))).digest()])
326 hashlib.sha1(''.join(sorted(heads))).digest()])
367 else:
327 else:
368 heads = encodelist(heads)
328 heads = encodelist(heads)
369
329
370 if util.safehasattr(cg, 'deltaheader'):
330 if util.safehasattr(cg, 'deltaheader'):
371 # this a bundle10, do the old style call sequence
331 # this a bundle10, do the old style call sequence
372 ret, output = self._callpush("unbundle", cg, heads=heads)
332 ret, output = self._callpush("unbundle", cg, heads=heads)
373 if ret == "":
333 if ret == "":
374 raise error.ResponseError(
334 raise error.ResponseError(
375 _('push failed:'), output)
335 _('push failed:'), output)
376 try:
336 try:
377 ret = int(ret)
337 ret = int(ret)
378 except ValueError:
338 except ValueError:
379 raise error.ResponseError(
339 raise error.ResponseError(
380 _('push failed (unexpected response):'), ret)
340 _('push failed (unexpected response):'), ret)
381
341
382 for l in output.splitlines(True):
342 for l in output.splitlines(True):
383 self.ui.status(_('remote: '), l)
343 self.ui.status(_('remote: '), l)
384 else:
344 else:
385 # bundle2 push. Send a stream, fetch a stream.
345 # bundle2 push. Send a stream, fetch a stream.
386 stream = self._calltwowaystream('unbundle', cg, heads=heads)
346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
387 ret = bundle2.getunbundler(self.ui, stream)
347 ret = bundle2.getunbundler(self.ui, stream)
388 return ret
348 return ret
389
349
390 # End of basewirepeer interface.
350 # End of basewirepeer interface.
391
351
392 # Begin of baselegacywirepeer interface.
352 # Begin of baselegacywirepeer interface.
393
353
394 def branches(self, nodes):
354 def branches(self, nodes):
395 n = encodelist(nodes)
355 n = encodelist(nodes)
396 d = self._call("branches", nodes=n)
356 d = self._call("branches", nodes=n)
397 try:
357 try:
398 br = [tuple(decodelist(b)) for b in d.splitlines()]
358 br = [tuple(decodelist(b)) for b in d.splitlines()]
399 return br
359 return br
400 except ValueError:
360 except ValueError:
401 self._abort(error.ResponseError(_("unexpected response:"), d))
361 self._abort(error.ResponseError(_("unexpected response:"), d))
402
362
403 def between(self, pairs):
363 def between(self, pairs):
404 batch = 8 # avoid giant requests
364 batch = 8 # avoid giant requests
405 r = []
365 r = []
406 for i in xrange(0, len(pairs), batch):
366 for i in xrange(0, len(pairs), batch):
407 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
367 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
408 d = self._call("between", pairs=n)
368 d = self._call("between", pairs=n)
409 try:
369 try:
410 r.extend(l and decodelist(l) or [] for l in d.splitlines())
370 r.extend(l and decodelist(l) or [] for l in d.splitlines())
411 except ValueError:
371 except ValueError:
412 self._abort(error.ResponseError(_("unexpected response:"), d))
372 self._abort(error.ResponseError(_("unexpected response:"), d))
413 return r
373 return r
414
374
415 def changegroup(self, nodes, kind):
375 def changegroup(self, nodes, kind):
416 n = encodelist(nodes)
376 n = encodelist(nodes)
417 f = self._callcompressable("changegroup", roots=n)
377 f = self._callcompressable("changegroup", roots=n)
418 return changegroupmod.cg1unpacker(f, 'UN')
378 return changegroupmod.cg1unpacker(f, 'UN')
419
379
420 def changegroupsubset(self, bases, heads, kind):
380 def changegroupsubset(self, bases, heads, kind):
421 self.requirecap('changegroupsubset', _('look up remote changes'))
381 self.requirecap('changegroupsubset', _('look up remote changes'))
422 bases = encodelist(bases)
382 bases = encodelist(bases)
423 heads = encodelist(heads)
383 heads = encodelist(heads)
424 f = self._callcompressable("changegroupsubset",
384 f = self._callcompressable("changegroupsubset",
425 bases=bases, heads=heads)
385 bases=bases, heads=heads)
426 return changegroupmod.cg1unpacker(f, 'UN')
386 return changegroupmod.cg1unpacker(f, 'UN')
427
387
428 # End of baselegacywirepeer interface.
388 # End of baselegacywirepeer interface.
429
389
430 def _submitbatch(self, req):
390 def _submitbatch(self, req):
431 """run batch request <req> on the server
391 """run batch request <req> on the server
432
392
433 Returns an iterator of the raw responses from the server.
393 Returns an iterator of the raw responses from the server.
434 """
394 """
435 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
395 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
436 chunk = rsp.read(1024)
396 chunk = rsp.read(1024)
437 work = [chunk]
397 work = [chunk]
438 while chunk:
398 while chunk:
439 while ';' not in chunk and chunk:
399 while ';' not in chunk and chunk:
440 chunk = rsp.read(1024)
400 chunk = rsp.read(1024)
441 work.append(chunk)
401 work.append(chunk)
442 merged = ''.join(work)
402 merged = ''.join(work)
443 while ';' in merged:
403 while ';' in merged:
444 one, merged = merged.split(';', 1)
404 one, merged = merged.split(';', 1)
445 yield unescapearg(one)
405 yield unescapearg(one)
446 chunk = rsp.read(1024)
406 chunk = rsp.read(1024)
447 work = [merged, chunk]
407 work = [merged, chunk]
448 yield unescapearg(''.join(work))
408 yield unescapearg(''.join(work))
449
409
450 def _submitone(self, op, args):
410 def _submitone(self, op, args):
451 return self._call(op, **pycompat.strkwargs(args))
411 return self._call(op, **pycompat.strkwargs(args))
452
412
453 def debugwireargs(self, one, two, three=None, four=None, five=None):
413 def debugwireargs(self, one, two, three=None, four=None, five=None):
454 # don't pass optional arguments left at their default value
414 # don't pass optional arguments left at their default value
455 opts = {}
415 opts = {}
456 if three is not None:
416 if three is not None:
457 opts[r'three'] = three
417 opts[r'three'] = three
458 if four is not None:
418 if four is not None:
459 opts[r'four'] = four
419 opts[r'four'] = four
460 return self._call('debugwireargs', one=one, two=two, **opts)
420 return self._call('debugwireargs', one=one, two=two, **opts)
461
421
462 def _call(self, cmd, **args):
422 def _call(self, cmd, **args):
463 """execute <cmd> on the server
423 """execute <cmd> on the server
464
424
465 The command is expected to return a simple string.
425 The command is expected to return a simple string.
466
426
467 returns the server reply as a string."""
427 returns the server reply as a string."""
468 raise NotImplementedError()
428 raise NotImplementedError()
469
429
470 def _callstream(self, cmd, **args):
430 def _callstream(self, cmd, **args):
471 """execute <cmd> on the server
431 """execute <cmd> on the server
472
432
473 The command is expected to return a stream. Note that if the
433 The command is expected to return a stream. Note that if the
474 command doesn't return a stream, _callstream behaves
434 command doesn't return a stream, _callstream behaves
475 differently for ssh and http peers.
435 differently for ssh and http peers.
476
436
477 returns the server reply as a file like object.
437 returns the server reply as a file like object.
478 """
438 """
479 raise NotImplementedError()
439 raise NotImplementedError()
480
440
481 def _callcompressable(self, cmd, **args):
441 def _callcompressable(self, cmd, **args):
482 """execute <cmd> on the server
442 """execute <cmd> on the server
483
443
484 The command is expected to return a stream.
444 The command is expected to return a stream.
485
445
486 The stream may have been compressed in some implementations. This
446 The stream may have been compressed in some implementations. This
487 function takes care of the decompression. This is the only difference
447 function takes care of the decompression. This is the only difference
488 with _callstream.
448 with _callstream.
489
449
490 returns the server reply as a file like object.
450 returns the server reply as a file like object.
491 """
451 """
492 raise NotImplementedError()
452 raise NotImplementedError()
493
453
494 def _callpush(self, cmd, fp, **args):
454 def _callpush(self, cmd, fp, **args):
495 """execute a <cmd> on server
455 """execute a <cmd> on server
496
456
497 The command is expected to be related to a push. Push has a special
457 The command is expected to be related to a push. Push has a special
498 return method.
458 return method.
499
459
500 returns the server reply as a (ret, output) tuple. ret is either
460 returns the server reply as a (ret, output) tuple. ret is either
501 empty (error) or a stringified int.
461 empty (error) or a stringified int.
502 """
462 """
503 raise NotImplementedError()
463 raise NotImplementedError()
504
464
505 def _calltwowaystream(self, cmd, fp, **args):
465 def _calltwowaystream(self, cmd, fp, **args):
506 """execute <cmd> on server
466 """execute <cmd> on server
507
467
508 The command will send a stream to the server and get a stream in reply.
468 The command will send a stream to the server and get a stream in reply.
509 """
469 """
510 raise NotImplementedError()
470 raise NotImplementedError()
511
471
512 def _abort(self, exception):
472 def _abort(self, exception):
513 """clearly abort the wire protocol connection and raise the exception
473 """clearly abort the wire protocol connection and raise the exception
514 """
474 """
515 raise NotImplementedError()
475 raise NotImplementedError()
516
476
517 # server side
477 # server side
518
478
519 # wire protocol command can either return a string or one of these classes.
479 # wire protocol command can either return a string or one of these classes.
520 class streamres(object):
480 class streamres(object):
521 """wireproto reply: binary stream
481 """wireproto reply: binary stream
522
482
523 The call was successful and the result is a stream.
483 The call was successful and the result is a stream.
524
484
525 Accepts a generator containing chunks of data to be sent to the client.
485 Accepts a generator containing chunks of data to be sent to the client.
526
486
527 ``prefer_uncompressed`` indicates that the data is expected to be
487 ``prefer_uncompressed`` indicates that the data is expected to be
528 uncompressable and that the stream should therefore use the ``none``
488 uncompressable and that the stream should therefore use the ``none``
529 engine.
489 engine.
530 """
490 """
531 def __init__(self, gen=None, prefer_uncompressed=False):
491 def __init__(self, gen=None, prefer_uncompressed=False):
532 self.gen = gen
492 self.gen = gen
533 self.prefer_uncompressed = prefer_uncompressed
493 self.prefer_uncompressed = prefer_uncompressed
534
494
535 class streamres_legacy(object):
495 class streamres_legacy(object):
536 """wireproto reply: uncompressed binary stream
496 """wireproto reply: uncompressed binary stream
537
497
538 The call was successful and the result is a stream.
498 The call was successful and the result is a stream.
539
499
540 Accepts a generator containing chunks of data to be sent to the client.
500 Accepts a generator containing chunks of data to be sent to the client.
541
501
542 Like ``streamres``, but sends an uncompressed data for "version 1" clients
502 Like ``streamres``, but sends an uncompressed data for "version 1" clients
543 using the application/mercurial-0.1 media type.
503 using the application/mercurial-0.1 media type.
544 """
504 """
545 def __init__(self, gen=None):
505 def __init__(self, gen=None):
546 self.gen = gen
506 self.gen = gen
547
507
548 class pushres(object):
508 class pushres(object):
549 """wireproto reply: success with simple integer return
509 """wireproto reply: success with simple integer return
550
510
551 The call was successful and returned an integer contained in `self.res`.
511 The call was successful and returned an integer contained in `self.res`.
552 """
512 """
553 def __init__(self, res):
513 def __init__(self, res):
554 self.res = res
514 self.res = res
555
515
556 class pusherr(object):
516 class pusherr(object):
557 """wireproto reply: failure
517 """wireproto reply: failure
558
518
559 The call failed. The `self.res` attribute contains the error message.
519 The call failed. The `self.res` attribute contains the error message.
560 """
520 """
561 def __init__(self, res):
521 def __init__(self, res):
562 self.res = res
522 self.res = res
563
523
564 class ooberror(object):
524 class ooberror(object):
565 """wireproto reply: failure of a batch of operation
525 """wireproto reply: failure of a batch of operation
566
526
567 Something failed during a batch call. The error message is stored in
527 Something failed during a batch call. The error message is stored in
568 `self.message`.
528 `self.message`.
569 """
529 """
570 def __init__(self, message):
530 def __init__(self, message):
571 self.message = message
531 self.message = message
572
532
573 def getdispatchrepo(repo, proto, command):
533 def getdispatchrepo(repo, proto, command):
574 """Obtain the repo used for processing wire protocol commands.
534 """Obtain the repo used for processing wire protocol commands.
575
535
576 The intent of this function is to serve as a monkeypatch point for
536 The intent of this function is to serve as a monkeypatch point for
577 extensions that need commands to operate on different repo views under
537 extensions that need commands to operate on different repo views under
578 specialized circumstances.
538 specialized circumstances.
579 """
539 """
580 return repo.filtered('served')
540 return repo.filtered('served')
581
541
582 def dispatch(repo, proto, command):
542 def dispatch(repo, proto, command):
583 repo = getdispatchrepo(repo, proto, command)
543 repo = getdispatchrepo(repo, proto, command)
584 func, spec = commands[command]
544 func, spec = commands[command]
585 args = proto.getargs(spec)
545 args = proto.getargs(spec)
586 return func(repo, proto, *args)
546 return func(repo, proto, *args)
587
547
588 def options(cmd, keys, others):
548 def options(cmd, keys, others):
589 opts = {}
549 opts = {}
590 for k in keys:
550 for k in keys:
591 if k in others:
551 if k in others:
592 opts[k] = others[k]
552 opts[k] = others[k]
593 del others[k]
553 del others[k]
594 if others:
554 if others:
595 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
555 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
596 % (cmd, ",".join(others)))
556 % (cmd, ",".join(others)))
597 return opts
557 return opts
598
558
599 def bundle1allowed(repo, action):
559 def bundle1allowed(repo, action):
600 """Whether a bundle1 operation is allowed from the server.
560 """Whether a bundle1 operation is allowed from the server.
601
561
602 Priority is:
562 Priority is:
603
563
604 1. server.bundle1gd.<action> (if generaldelta active)
564 1. server.bundle1gd.<action> (if generaldelta active)
605 2. server.bundle1.<action>
565 2. server.bundle1.<action>
606 3. server.bundle1gd (if generaldelta active)
566 3. server.bundle1gd (if generaldelta active)
607 4. server.bundle1
567 4. server.bundle1
608 """
568 """
609 ui = repo.ui
569 ui = repo.ui
610 gd = 'generaldelta' in repo.requirements
570 gd = 'generaldelta' in repo.requirements
611
571
612 if gd:
572 if gd:
613 v = ui.configbool('server', 'bundle1gd.%s' % action)
573 v = ui.configbool('server', 'bundle1gd.%s' % action)
614 if v is not None:
574 if v is not None:
615 return v
575 return v
616
576
617 v = ui.configbool('server', 'bundle1.%s' % action)
577 v = ui.configbool('server', 'bundle1.%s' % action)
618 if v is not None:
578 if v is not None:
619 return v
579 return v
620
580
621 if gd:
581 if gd:
622 v = ui.configbool('server', 'bundle1gd')
582 v = ui.configbool('server', 'bundle1gd')
623 if v is not None:
583 if v is not None:
624 return v
584 return v
625
585
626 return ui.configbool('server', 'bundle1')
586 return ui.configbool('server', 'bundle1')
627
587
628 def supportedcompengines(ui, proto, role):
588 def supportedcompengines(ui, proto, role):
629 """Obtain the list of supported compression engines for a request."""
589 """Obtain the list of supported compression engines for a request."""
630 assert role in (util.CLIENTROLE, util.SERVERROLE)
590 assert role in (util.CLIENTROLE, util.SERVERROLE)
631
591
632 compengines = util.compengines.supportedwireengines(role)
592 compengines = util.compengines.supportedwireengines(role)
633
593
634 # Allow config to override default list and ordering.
594 # Allow config to override default list and ordering.
635 if role == util.SERVERROLE:
595 if role == util.SERVERROLE:
636 configengines = ui.configlist('server', 'compressionengines')
596 configengines = ui.configlist('server', 'compressionengines')
637 config = 'server.compressionengines'
597 config = 'server.compressionengines'
638 else:
598 else:
639 # This is currently implemented mainly to facilitate testing. In most
599 # This is currently implemented mainly to facilitate testing. In most
640 # cases, the server should be in charge of choosing a compression engine
600 # cases, the server should be in charge of choosing a compression engine
641 # because a server has the most to lose from a sub-optimal choice. (e.g.
601 # because a server has the most to lose from a sub-optimal choice. (e.g.
642 # CPU DoS due to an expensive engine or a network DoS due to poor
602 # CPU DoS due to an expensive engine or a network DoS due to poor
643 # compression ratio).
603 # compression ratio).
644 configengines = ui.configlist('experimental',
604 configengines = ui.configlist('experimental',
645 'clientcompressionengines')
605 'clientcompressionengines')
646 config = 'experimental.clientcompressionengines'
606 config = 'experimental.clientcompressionengines'
647
607
648 # No explicit config. Filter out the ones that aren't supposed to be
608 # No explicit config. Filter out the ones that aren't supposed to be
649 # advertised and return default ordering.
609 # advertised and return default ordering.
650 if not configengines:
610 if not configengines:
651 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
611 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
652 return [e for e in compengines
612 return [e for e in compengines
653 if getattr(e.wireprotosupport(), attr) > 0]
613 if getattr(e.wireprotosupport(), attr) > 0]
654
614
655 # If compression engines are listed in the config, assume there is a good
615 # If compression engines are listed in the config, assume there is a good
656 # reason for it (like server operators wanting to achieve specific
616 # reason for it (like server operators wanting to achieve specific
657 # performance characteristics). So fail fast if the config references
617 # performance characteristics). So fail fast if the config references
658 # unusable compression engines.
618 # unusable compression engines.
659 validnames = set(e.name() for e in compengines)
619 validnames = set(e.name() for e in compengines)
660 invalidnames = set(e for e in configengines if e not in validnames)
620 invalidnames = set(e for e in configengines if e not in validnames)
661 if invalidnames:
621 if invalidnames:
662 raise error.Abort(_('invalid compression engine defined in %s: %s') %
622 raise error.Abort(_('invalid compression engine defined in %s: %s') %
663 (config, ', '.join(sorted(invalidnames))))
623 (config, ', '.join(sorted(invalidnames))))
664
624
665 compengines = [e for e in compengines if e.name() in configengines]
625 compengines = [e for e in compengines if e.name() in configengines]
666 compengines = sorted(compengines,
626 compengines = sorted(compengines,
667 key=lambda e: configengines.index(e.name()))
627 key=lambda e: configengines.index(e.name()))
668
628
669 if not compengines:
629 if not compengines:
670 raise error.Abort(_('%s config option does not specify any known '
630 raise error.Abort(_('%s config option does not specify any known '
671 'compression engines') % config,
631 'compression engines') % config,
672 hint=_('usable compression engines: %s') %
632 hint=_('usable compression engines: %s') %
673 ', '.sorted(validnames))
633 ', '.sorted(validnames))
674
634
675 return compengines
635 return compengines
676
636
677 # list of commands
637 # list of commands
678 commands = {}
638 commands = {}
679
639
680 def wireprotocommand(name, args=''):
640 def wireprotocommand(name, args=''):
681 """decorator for wire protocol command"""
641 """decorator for wire protocol command"""
682 def register(func):
642 def register(func):
683 commands[name] = (func, args)
643 commands[name] = (func, args)
684 return func
644 return func
685 return register
645 return register
686
646
687 @wireprotocommand('batch', 'cmds *')
647 @wireprotocommand('batch', 'cmds *')
688 def batch(repo, proto, cmds, others):
648 def batch(repo, proto, cmds, others):
689 repo = repo.filtered("served")
649 repo = repo.filtered("served")
690 res = []
650 res = []
691 for pair in cmds.split(';'):
651 for pair in cmds.split(';'):
692 op, args = pair.split(' ', 1)
652 op, args = pair.split(' ', 1)
693 vals = {}
653 vals = {}
694 for a in args.split(','):
654 for a in args.split(','):
695 if a:
655 if a:
696 n, v = a.split('=')
656 n, v = a.split('=')
697 vals[unescapearg(n)] = unescapearg(v)
657 vals[unescapearg(n)] = unescapearg(v)
698 func, spec = commands[op]
658 func, spec = commands[op]
699 if spec:
659 if spec:
700 keys = spec.split()
660 keys = spec.split()
701 data = {}
661 data = {}
702 for k in keys:
662 for k in keys:
703 if k == '*':
663 if k == '*':
704 star = {}
664 star = {}
705 for key in vals.keys():
665 for key in vals.keys():
706 if key not in keys:
666 if key not in keys:
707 star[key] = vals[key]
667 star[key] = vals[key]
708 data['*'] = star
668 data['*'] = star
709 else:
669 else:
710 data[k] = vals[k]
670 data[k] = vals[k]
711 result = func(repo, proto, *[data[k] for k in keys])
671 result = func(repo, proto, *[data[k] for k in keys])
712 else:
672 else:
713 result = func(repo, proto)
673 result = func(repo, proto)
714 if isinstance(result, ooberror):
674 if isinstance(result, ooberror):
715 return result
675 return result
716 res.append(escapearg(result))
676 res.append(escapearg(result))
717 return ';'.join(res)
677 return ';'.join(res)
718
678
719 @wireprotocommand('between', 'pairs')
679 @wireprotocommand('between', 'pairs')
720 def between(repo, proto, pairs):
680 def between(repo, proto, pairs):
721 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
681 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
722 r = []
682 r = []
723 for b in repo.between(pairs):
683 for b in repo.between(pairs):
724 r.append(encodelist(b) + "\n")
684 r.append(encodelist(b) + "\n")
725 return "".join(r)
685 return "".join(r)
726
686
727 @wireprotocommand('branchmap')
687 @wireprotocommand('branchmap')
728 def branchmap(repo, proto):
688 def branchmap(repo, proto):
729 branchmap = repo.branchmap()
689 branchmap = repo.branchmap()
730 heads = []
690 heads = []
731 for branch, nodes in branchmap.iteritems():
691 for branch, nodes in branchmap.iteritems():
732 branchname = urlreq.quote(encoding.fromlocal(branch))
692 branchname = urlreq.quote(encoding.fromlocal(branch))
733 branchnodes = encodelist(nodes)
693 branchnodes = encodelist(nodes)
734 heads.append('%s %s' % (branchname, branchnodes))
694 heads.append('%s %s' % (branchname, branchnodes))
735 return '\n'.join(heads)
695 return '\n'.join(heads)
736
696
737 @wireprotocommand('branches', 'nodes')
697 @wireprotocommand('branches', 'nodes')
738 def branches(repo, proto, nodes):
698 def branches(repo, proto, nodes):
739 nodes = decodelist(nodes)
699 nodes = decodelist(nodes)
740 r = []
700 r = []
741 for b in repo.branches(nodes):
701 for b in repo.branches(nodes):
742 r.append(encodelist(b) + "\n")
702 r.append(encodelist(b) + "\n")
743 return "".join(r)
703 return "".join(r)
744
704
745 @wireprotocommand('clonebundles', '')
705 @wireprotocommand('clonebundles', '')
746 def clonebundles(repo, proto):
706 def clonebundles(repo, proto):
747 """Server command for returning info for available bundles to seed clones.
707 """Server command for returning info for available bundles to seed clones.
748
708
749 Clients will parse this response and determine what bundle to fetch.
709 Clients will parse this response and determine what bundle to fetch.
750
710
751 Extensions may wrap this command to filter or dynamically emit data
711 Extensions may wrap this command to filter or dynamically emit data
752 depending on the request. e.g. you could advertise URLs for the closest
712 depending on the request. e.g. you could advertise URLs for the closest
753 data center given the client's IP address.
713 data center given the client's IP address.
754 """
714 """
755 return repo.vfs.tryread('clonebundles.manifest')
715 return repo.vfs.tryread('clonebundles.manifest')
756
716
757 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
717 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
758 'known', 'getbundle', 'unbundlehash', 'batch']
718 'known', 'getbundle', 'unbundlehash', 'batch']
759
719
760 def _capabilities(repo, proto):
720 def _capabilities(repo, proto):
761 """return a list of capabilities for a repo
721 """return a list of capabilities for a repo
762
722
763 This function exists to allow extensions to easily wrap capabilities
723 This function exists to allow extensions to easily wrap capabilities
764 computation
724 computation
765
725
766 - returns a lists: easy to alter
726 - returns a lists: easy to alter
767 - change done here will be propagated to both `capabilities` and `hello`
727 - change done here will be propagated to both `capabilities` and `hello`
768 command without any other action needed.
728 command without any other action needed.
769 """
729 """
770 # copy to prevent modification of the global list
730 # copy to prevent modification of the global list
771 caps = list(wireprotocaps)
731 caps = list(wireprotocaps)
772 if streamclone.allowservergeneration(repo):
732 if streamclone.allowservergeneration(repo):
773 if repo.ui.configbool('server', 'preferuncompressed'):
733 if repo.ui.configbool('server', 'preferuncompressed'):
774 caps.append('stream-preferred')
734 caps.append('stream-preferred')
775 requiredformats = repo.requirements & repo.supportedformats
735 requiredformats = repo.requirements & repo.supportedformats
776 # if our local revlogs are just revlogv1, add 'stream' cap
736 # if our local revlogs are just revlogv1, add 'stream' cap
777 if not requiredformats - {'revlogv1'}:
737 if not requiredformats - {'revlogv1'}:
778 caps.append('stream')
738 caps.append('stream')
779 # otherwise, add 'streamreqs' detailing our local revlog format
739 # otherwise, add 'streamreqs' detailing our local revlog format
780 else:
740 else:
781 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
741 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
782 if repo.ui.configbool('experimental', 'bundle2-advertise'):
742 if repo.ui.configbool('experimental', 'bundle2-advertise'):
783 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
743 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo, role='server'))
784 caps.append('bundle2=' + urlreq.quote(capsblob))
744 caps.append('bundle2=' + urlreq.quote(capsblob))
785 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
745 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
786
746
787 if proto.name == 'http':
747 if proto.name == 'http':
788 caps.append('httpheader=%d' %
748 caps.append('httpheader=%d' %
789 repo.ui.configint('server', 'maxhttpheaderlen'))
749 repo.ui.configint('server', 'maxhttpheaderlen'))
790 if repo.ui.configbool('experimental', 'httppostargs'):
750 if repo.ui.configbool('experimental', 'httppostargs'):
791 caps.append('httppostargs')
751 caps.append('httppostargs')
792
752
793 # FUTURE advertise 0.2rx once support is implemented
753 # FUTURE advertise 0.2rx once support is implemented
794 # FUTURE advertise minrx and mintx after consulting config option
754 # FUTURE advertise minrx and mintx after consulting config option
795 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
755 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
796
756
797 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
757 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
798 if compengines:
758 if compengines:
799 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
759 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
800 for e in compengines)
760 for e in compengines)
801 caps.append('compression=%s' % comptypes)
761 caps.append('compression=%s' % comptypes)
802
762
803 return caps
763 return caps
804
764
805 # If you are writing an extension and consider wrapping this function. Wrap
765 # If you are writing an extension and consider wrapping this function. Wrap
806 # `_capabilities` instead.
766 # `_capabilities` instead.
807 @wireprotocommand('capabilities')
767 @wireprotocommand('capabilities')
808 def capabilities(repo, proto):
768 def capabilities(repo, proto):
809 return ' '.join(_capabilities(repo, proto))
769 return ' '.join(_capabilities(repo, proto))
810
770
811 @wireprotocommand('changegroup', 'roots')
771 @wireprotocommand('changegroup', 'roots')
812 def changegroup(repo, proto, roots):
772 def changegroup(repo, proto, roots):
813 nodes = decodelist(roots)
773 nodes = decodelist(roots)
814 outgoing = discovery.outgoing(repo, missingroots=nodes,
774 outgoing = discovery.outgoing(repo, missingroots=nodes,
815 missingheads=repo.heads())
775 missingheads=repo.heads())
816 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
776 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
817 gen = iter(lambda: cg.read(32768), '')
777 gen = iter(lambda: cg.read(32768), '')
818 return streamres(gen=gen)
778 return streamres(gen=gen)
819
779
820 @wireprotocommand('changegroupsubset', 'bases heads')
780 @wireprotocommand('changegroupsubset', 'bases heads')
821 def changegroupsubset(repo, proto, bases, heads):
781 def changegroupsubset(repo, proto, bases, heads):
822 bases = decodelist(bases)
782 bases = decodelist(bases)
823 heads = decodelist(heads)
783 heads = decodelist(heads)
824 outgoing = discovery.outgoing(repo, missingroots=bases,
784 outgoing = discovery.outgoing(repo, missingroots=bases,
825 missingheads=heads)
785 missingheads=heads)
826 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
786 cg = changegroupmod.makechangegroup(repo, outgoing, '01', 'serve')
827 gen = iter(lambda: cg.read(32768), '')
787 gen = iter(lambda: cg.read(32768), '')
828 return streamres(gen=gen)
788 return streamres(gen=gen)
829
789
830 @wireprotocommand('debugwireargs', 'one two *')
790 @wireprotocommand('debugwireargs', 'one two *')
831 def debugwireargs(repo, proto, one, two, others):
791 def debugwireargs(repo, proto, one, two, others):
832 # only accept optional args from the known set
792 # only accept optional args from the known set
833 opts = options('debugwireargs', ['three', 'four'], others)
793 opts = options('debugwireargs', ['three', 'four'], others)
834 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
794 return repo.debugwireargs(one, two, **pycompat.strkwargs(opts))
835
795
836 @wireprotocommand('getbundle', '*')
796 @wireprotocommand('getbundle', '*')
837 def getbundle(repo, proto, others):
797 def getbundle(repo, proto, others):
838 opts = options('getbundle', gboptsmap.keys(), others)
798 opts = options('getbundle', gboptsmap.keys(), others)
839 for k, v in opts.iteritems():
799 for k, v in opts.iteritems():
840 keytype = gboptsmap[k]
800 keytype = gboptsmap[k]
841 if keytype == 'nodes':
801 if keytype == 'nodes':
842 opts[k] = decodelist(v)
802 opts[k] = decodelist(v)
843 elif keytype == 'csv':
803 elif keytype == 'csv':
844 opts[k] = list(v.split(','))
804 opts[k] = list(v.split(','))
845 elif keytype == 'scsv':
805 elif keytype == 'scsv':
846 opts[k] = set(v.split(','))
806 opts[k] = set(v.split(','))
847 elif keytype == 'boolean':
807 elif keytype == 'boolean':
848 # Client should serialize False as '0', which is a non-empty string
808 # Client should serialize False as '0', which is a non-empty string
849 # so it evaluates as a True bool.
809 # so it evaluates as a True bool.
850 if v == '0':
810 if v == '0':
851 opts[k] = False
811 opts[k] = False
852 else:
812 else:
853 opts[k] = bool(v)
813 opts[k] = bool(v)
854 elif keytype != 'plain':
814 elif keytype != 'plain':
855 raise KeyError('unknown getbundle option type %s'
815 raise KeyError('unknown getbundle option type %s'
856 % keytype)
816 % keytype)
857
817
858 if not bundle1allowed(repo, 'pull'):
818 if not bundle1allowed(repo, 'pull'):
859 if not exchange.bundle2requested(opts.get('bundlecaps')):
819 if not exchange.bundle2requested(opts.get('bundlecaps')):
860 if proto.name == 'http':
820 if proto.name == 'http':
861 return ooberror(bundle2required)
821 return ooberror(bundle2required)
862 raise error.Abort(bundle2requiredmain,
822 raise error.Abort(bundle2requiredmain,
863 hint=bundle2requiredhint)
823 hint=bundle2requiredhint)
864
824
865 prefercompressed = True
825 prefercompressed = True
866
826
867 try:
827 try:
868 if repo.ui.configbool('server', 'disablefullbundle'):
828 if repo.ui.configbool('server', 'disablefullbundle'):
869 # Check to see if this is a full clone.
829 # Check to see if this is a full clone.
870 clheads = set(repo.changelog.heads())
830 clheads = set(repo.changelog.heads())
871 changegroup = opts.get('cg', True)
831 changegroup = opts.get('cg', True)
872 heads = set(opts.get('heads', set()))
832 heads = set(opts.get('heads', set()))
873 common = set(opts.get('common', set()))
833 common = set(opts.get('common', set()))
874 common.discard(nullid)
834 common.discard(nullid)
875 if changegroup and not common and clheads == heads:
835 if changegroup and not common and clheads == heads:
876 raise error.Abort(
836 raise error.Abort(
877 _('server has pull-based clones disabled'),
837 _('server has pull-based clones disabled'),
878 hint=_('remove --pull if specified or upgrade Mercurial'))
838 hint=_('remove --pull if specified or upgrade Mercurial'))
879
839
880 info, chunks = exchange.getbundlechunks(repo, 'serve',
840 info, chunks = exchange.getbundlechunks(repo, 'serve',
881 **pycompat.strkwargs(opts))
841 **pycompat.strkwargs(opts))
882 prefercompressed = info.get('prefercompressed', True)
842 prefercompressed = info.get('prefercompressed', True)
883 except error.Abort as exc:
843 except error.Abort as exc:
884 # cleanly forward Abort error to the client
844 # cleanly forward Abort error to the client
885 if not exchange.bundle2requested(opts.get('bundlecaps')):
845 if not exchange.bundle2requested(opts.get('bundlecaps')):
886 if proto.name == 'http':
846 if proto.name == 'http':
887 return ooberror(str(exc) + '\n')
847 return ooberror(str(exc) + '\n')
888 raise # cannot do better for bundle1 + ssh
848 raise # cannot do better for bundle1 + ssh
889 # bundle2 request expect a bundle2 reply
849 # bundle2 request expect a bundle2 reply
890 bundler = bundle2.bundle20(repo.ui)
850 bundler = bundle2.bundle20(repo.ui)
891 manargs = [('message', str(exc))]
851 manargs = [('message', str(exc))]
892 advargs = []
852 advargs = []
893 if exc.hint is not None:
853 if exc.hint is not None:
894 advargs.append(('hint', exc.hint))
854 advargs.append(('hint', exc.hint))
895 bundler.addpart(bundle2.bundlepart('error:abort',
855 bundler.addpart(bundle2.bundlepart('error:abort',
896 manargs, advargs))
856 manargs, advargs))
897 chunks = bundler.getchunks()
857 chunks = bundler.getchunks()
898 prefercompressed = False
858 prefercompressed = False
899
859
900 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
860 return streamres(gen=chunks, prefer_uncompressed=not prefercompressed)
901
861
902 @wireprotocommand('heads')
862 @wireprotocommand('heads')
903 def heads(repo, proto):
863 def heads(repo, proto):
904 h = repo.heads()
864 h = repo.heads()
905 return encodelist(h) + "\n"
865 return encodelist(h) + "\n"
906
866
907 @wireprotocommand('hello')
867 @wireprotocommand('hello')
908 def hello(repo, proto):
868 def hello(repo, proto):
909 '''the hello command returns a set of lines describing various
869 '''the hello command returns a set of lines describing various
910 interesting things about the server, in an RFC822-like format.
870 interesting things about the server, in an RFC822-like format.
911 Currently the only one defined is "capabilities", which
871 Currently the only one defined is "capabilities", which
912 consists of a line in the form:
872 consists of a line in the form:
913
873
914 capabilities: space separated list of tokens
874 capabilities: space separated list of tokens
915 '''
875 '''
916 return "capabilities: %s\n" % (capabilities(repo, proto))
876 return "capabilities: %s\n" % (capabilities(repo, proto))
917
877
918 @wireprotocommand('listkeys', 'namespace')
878 @wireprotocommand('listkeys', 'namespace')
919 def listkeys(repo, proto, namespace):
879 def listkeys(repo, proto, namespace):
920 d = repo.listkeys(encoding.tolocal(namespace)).items()
880 d = repo.listkeys(encoding.tolocal(namespace)).items()
921 return pushkeymod.encodekeys(d)
881 return pushkeymod.encodekeys(d)
922
882
923 @wireprotocommand('lookup', 'key')
883 @wireprotocommand('lookup', 'key')
924 def lookup(repo, proto, key):
884 def lookup(repo, proto, key):
925 try:
885 try:
926 k = encoding.tolocal(key)
886 k = encoding.tolocal(key)
927 c = repo[k]
887 c = repo[k]
928 r = c.hex()
888 r = c.hex()
929 success = 1
889 success = 1
930 except Exception as inst:
890 except Exception as inst:
931 r = str(inst)
891 r = str(inst)
932 success = 0
892 success = 0
933 return "%d %s\n" % (success, r)
893 return "%d %s\n" % (success, r)
934
894
935 @wireprotocommand('known', 'nodes *')
895 @wireprotocommand('known', 'nodes *')
936 def known(repo, proto, nodes, others):
896 def known(repo, proto, nodes, others):
937 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
897 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
938
898
939 @wireprotocommand('pushkey', 'namespace key old new')
899 @wireprotocommand('pushkey', 'namespace key old new')
940 def pushkey(repo, proto, namespace, key, old, new):
900 def pushkey(repo, proto, namespace, key, old, new):
941 # compatibility with pre-1.8 clients which were accidentally
901 # compatibility with pre-1.8 clients which were accidentally
942 # sending raw binary nodes rather than utf-8-encoded hex
902 # sending raw binary nodes rather than utf-8-encoded hex
943 if len(new) == 20 and util.escapestr(new) != new:
903 if len(new) == 20 and util.escapestr(new) != new:
944 # looks like it could be a binary node
904 # looks like it could be a binary node
945 try:
905 try:
946 new.decode('utf-8')
906 new.decode('utf-8')
947 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
907 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
948 except UnicodeDecodeError:
908 except UnicodeDecodeError:
949 pass # binary, leave unmodified
909 pass # binary, leave unmodified
950 else:
910 else:
951 new = encoding.tolocal(new) # normal path
911 new = encoding.tolocal(new) # normal path
952
912
953 if util.safehasattr(proto, 'restore'):
913 if util.safehasattr(proto, 'restore'):
954
914
955 proto.redirect()
915 proto.redirect()
956
916
957 try:
917 try:
958 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
918 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
959 encoding.tolocal(old), new) or False
919 encoding.tolocal(old), new) or False
960 except error.Abort:
920 except error.Abort:
961 r = False
921 r = False
962
922
963 output = proto.restore()
923 output = proto.restore()
964
924
965 return '%s\n%s' % (int(r), output)
925 return '%s\n%s' % (int(r), output)
966
926
967 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
968 encoding.tolocal(old), new)
928 encoding.tolocal(old), new)
969 return '%s\n' % int(r)
929 return '%s\n' % int(r)
970
930
971 @wireprotocommand('stream_out')
931 @wireprotocommand('stream_out')
972 def stream(repo, proto):
932 def stream(repo, proto):
973 '''If the server supports streaming clone, it advertises the "stream"
933 '''If the server supports streaming clone, it advertises the "stream"
974 capability with a value representing the version and flags of the repo
934 capability with a value representing the version and flags of the repo
975 it is serving. Client checks to see if it understands the format.
935 it is serving. Client checks to see if it understands the format.
976 '''
936 '''
977 return streamres_legacy(streamclone.generatev1wireproto(repo))
937 return streamres_legacy(streamclone.generatev1wireproto(repo))
978
938
979 @wireprotocommand('unbundle', 'heads')
939 @wireprotocommand('unbundle', 'heads')
980 def unbundle(repo, proto, heads):
940 def unbundle(repo, proto, heads):
981 their_heads = decodelist(heads)
941 their_heads = decodelist(heads)
982
942
983 try:
943 try:
984 proto.redirect()
944 proto.redirect()
985
945
986 exchange.check_heads(repo, their_heads, 'preparing changes')
946 exchange.check_heads(repo, their_heads, 'preparing changes')
987
947
988 # write bundle data to temporary file because it can be big
948 # write bundle data to temporary file because it can be big
989 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
949 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
990 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
950 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
991 r = 0
951 r = 0
992 try:
952 try:
993 proto.getfile(fp)
953 proto.getfile(fp)
994 fp.seek(0)
954 fp.seek(0)
995 gen = exchange.readbundle(repo.ui, fp, None)
955 gen = exchange.readbundle(repo.ui, fp, None)
996 if (isinstance(gen, changegroupmod.cg1unpacker)
956 if (isinstance(gen, changegroupmod.cg1unpacker)
997 and not bundle1allowed(repo, 'push')):
957 and not bundle1allowed(repo, 'push')):
998 if proto.name == 'http':
958 if proto.name == 'http':
999 # need to special case http because stderr do not get to
959 # need to special case http because stderr do not get to
1000 # the http client on failed push so we need to abuse some
960 # the http client on failed push so we need to abuse some
1001 # other error type to make sure the message get to the
961 # other error type to make sure the message get to the
1002 # user.
962 # user.
1003 return ooberror(bundle2required)
963 return ooberror(bundle2required)
1004 raise error.Abort(bundle2requiredmain,
964 raise error.Abort(bundle2requiredmain,
1005 hint=bundle2requiredhint)
965 hint=bundle2requiredhint)
1006
966
1007 r = exchange.unbundle(repo, gen, their_heads, 'serve',
967 r = exchange.unbundle(repo, gen, their_heads, 'serve',
1008 proto._client())
968 proto._client())
1009 if util.safehasattr(r, 'addpart'):
969 if util.safehasattr(r, 'addpart'):
1010 # The return looks streamable, we are in the bundle2 case and
970 # The return looks streamable, we are in the bundle2 case and
1011 # should return a stream.
971 # should return a stream.
1012 return streamres_legacy(gen=r.getchunks())
972 return streamres_legacy(gen=r.getchunks())
1013 return pushres(r)
973 return pushres(r)
1014
974
1015 finally:
975 finally:
1016 fp.close()
976 fp.close()
1017 os.unlink(tempname)
977 os.unlink(tempname)
1018
978
1019 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
979 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1020 # handle non-bundle2 case first
980 # handle non-bundle2 case first
1021 if not getattr(exc, 'duringunbundle2', False):
981 if not getattr(exc, 'duringunbundle2', False):
1022 try:
982 try:
1023 raise
983 raise
1024 except error.Abort:
984 except error.Abort:
1025 # The old code we moved used util.stderr directly.
985 # The old code we moved used util.stderr directly.
1026 # We did not change it to minimise code change.
986 # We did not change it to minimise code change.
1027 # This need to be moved to something proper.
987 # This need to be moved to something proper.
1028 # Feel free to do it.
988 # Feel free to do it.
1029 util.stderr.write("abort: %s\n" % exc)
989 util.stderr.write("abort: %s\n" % exc)
1030 if exc.hint is not None:
990 if exc.hint is not None:
1031 util.stderr.write("(%s)\n" % exc.hint)
991 util.stderr.write("(%s)\n" % exc.hint)
1032 return pushres(0)
992 return pushres(0)
1033 except error.PushRaced:
993 except error.PushRaced:
1034 return pusherr(str(exc))
994 return pusherr(str(exc))
1035
995
1036 bundler = bundle2.bundle20(repo.ui)
996 bundler = bundle2.bundle20(repo.ui)
1037 for out in getattr(exc, '_bundle2salvagedoutput', ()):
997 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1038 bundler.addpart(out)
998 bundler.addpart(out)
1039 try:
999 try:
1040 try:
1000 try:
1041 raise
1001 raise
1042 except error.PushkeyFailed as exc:
1002 except error.PushkeyFailed as exc:
1043 # check client caps
1003 # check client caps
1044 remotecaps = getattr(exc, '_replycaps', None)
1004 remotecaps = getattr(exc, '_replycaps', None)
1045 if (remotecaps is not None
1005 if (remotecaps is not None
1046 and 'pushkey' not in remotecaps.get('error', ())):
1006 and 'pushkey' not in remotecaps.get('error', ())):
1047 # no support remote side, fallback to Abort handler.
1007 # no support remote side, fallback to Abort handler.
1048 raise
1008 raise
1049 part = bundler.newpart('error:pushkey')
1009 part = bundler.newpart('error:pushkey')
1050 part.addparam('in-reply-to', exc.partid)
1010 part.addparam('in-reply-to', exc.partid)
1051 if exc.namespace is not None:
1011 if exc.namespace is not None:
1052 part.addparam('namespace', exc.namespace, mandatory=False)
1012 part.addparam('namespace', exc.namespace, mandatory=False)
1053 if exc.key is not None:
1013 if exc.key is not None:
1054 part.addparam('key', exc.key, mandatory=False)
1014 part.addparam('key', exc.key, mandatory=False)
1055 if exc.new is not None:
1015 if exc.new is not None:
1056 part.addparam('new', exc.new, mandatory=False)
1016 part.addparam('new', exc.new, mandatory=False)
1057 if exc.old is not None:
1017 if exc.old is not None:
1058 part.addparam('old', exc.old, mandatory=False)
1018 part.addparam('old', exc.old, mandatory=False)
1059 if exc.ret is not None:
1019 if exc.ret is not None:
1060 part.addparam('ret', exc.ret, mandatory=False)
1020 part.addparam('ret', exc.ret, mandatory=False)
1061 except error.BundleValueError as exc:
1021 except error.BundleValueError as exc:
1062 errpart = bundler.newpart('error:unsupportedcontent')
1022 errpart = bundler.newpart('error:unsupportedcontent')
1063 if exc.parttype is not None:
1023 if exc.parttype is not None:
1064 errpart.addparam('parttype', exc.parttype)
1024 errpart.addparam('parttype', exc.parttype)
1065 if exc.params:
1025 if exc.params:
1066 errpart.addparam('params', '\0'.join(exc.params))
1026 errpart.addparam('params', '\0'.join(exc.params))
1067 except error.Abort as exc:
1027 except error.Abort as exc:
1068 manargs = [('message', str(exc))]
1028 manargs = [('message', str(exc))]
1069 advargs = []
1029 advargs = []
1070 if exc.hint is not None:
1030 if exc.hint is not None:
1071 advargs.append(('hint', exc.hint))
1031 advargs.append(('hint', exc.hint))
1072 bundler.addpart(bundle2.bundlepart('error:abort',
1032 bundler.addpart(bundle2.bundlepart('error:abort',
1073 manargs, advargs))
1033 manargs, advargs))
1074 except error.PushRaced as exc:
1034 except error.PushRaced as exc:
1075 bundler.newpart('error:pushraced', [('message', str(exc))])
1035 bundler.newpart('error:pushraced', [('message', str(exc))])
1076 return streamres_legacy(gen=bundler.getchunks())
1036 return streamres_legacy(gen=bundler.getchunks())
@@ -1,314 +1,354 b''
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
1 # Copyright 21 May 2005 - (c) 2005 Jake Edge <jake@edge2.net>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
2 # Copyright 2005-2007 Matt Mackall <mpm@selenic.com>
3 #
3 #
4 # This software may be used and distributed according to the terms of the
4 # This software may be used and distributed according to the terms of the
5 # GNU General Public License version 2 or any later version.
5 # GNU General Public License version 2 or any later version.
6
6
7 from __future__ import absolute_import
7 from __future__ import absolute_import
8
8
9 import cgi
9 import cgi
10 import struct
10 import struct
11 import sys
11 import sys
12
12
13 from .i18n import _
13 from .i18n import _
14 from . import (
14 from . import (
15 encoding,
15 encoding,
16 error,
16 error,
17 hook,
17 hook,
18 pycompat,
18 pycompat,
19 util,
19 util,
20 wireproto,
20 wireproto,
21 )
21 )
22
22
23 stringio = util.stringio
23 stringio = util.stringio
24
24
25 urlerr = util.urlerr
25 urlerr = util.urlerr
26 urlreq = util.urlreq
26 urlreq = util.urlreq
27
27
28 HTTP_OK = 200
28 HTTP_OK = 200
29
29
30 HGTYPE = 'application/mercurial-0.1'
30 HGTYPE = 'application/mercurial-0.1'
31 HGTYPE2 = 'application/mercurial-0.2'
31 HGTYPE2 = 'application/mercurial-0.2'
32 HGERRTYPE = 'application/hg-error'
32 HGERRTYPE = 'application/hg-error'
33
33
34 class abstractserverproto(object):
35 """abstract class that summarizes the protocol API
36
37 Used as reference and documentation.
38 """
39
40 def getargs(self, args):
41 """return the value for arguments in <args>
42
43 returns a list of values (same order as <args>)"""
44 raise NotImplementedError()
45
46 def getfile(self, fp):
47 """write the whole content of a file into a file like object
48
49 The file is in the form::
50
51 (<chunk-size>\n<chunk>)+0\n
52
53 chunk size is the ascii version of the int.
54 """
55 raise NotImplementedError()
56
57 def redirect(self):
58 """may setup interception for stdout and stderr
59
60 See also the `restore` method."""
61 raise NotImplementedError()
62
63 # If the `redirect` function does install interception, the `restore`
64 # function MUST be defined. If interception is not used, this function
65 # MUST NOT be defined.
66 #
67 # left commented here on purpose
68 #
69 #def restore(self):
70 # """reinstall previous stdout and stderr and return intercepted stdout
71 # """
72 # raise NotImplementedError()
73
34 def decodevaluefromheaders(req, headerprefix):
74 def decodevaluefromheaders(req, headerprefix):
35 """Decode a long value from multiple HTTP request headers.
75 """Decode a long value from multiple HTTP request headers.
36
76
37 Returns the value as a bytes, not a str.
77 Returns the value as a bytes, not a str.
38 """
78 """
39 chunks = []
79 chunks = []
40 i = 1
80 i = 1
41 prefix = headerprefix.upper().replace(r'-', r'_')
81 prefix = headerprefix.upper().replace(r'-', r'_')
42 while True:
82 while True:
43 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
83 v = req.env.get(r'HTTP_%s_%d' % (prefix, i))
44 if v is None:
84 if v is None:
45 break
85 break
46 chunks.append(pycompat.bytesurl(v))
86 chunks.append(pycompat.bytesurl(v))
47 i += 1
87 i += 1
48
88
49 return ''.join(chunks)
89 return ''.join(chunks)
50
90
51 class webproto(wireproto.abstractserverproto):
91 class webproto(abstractserverproto):
52 def __init__(self, req, ui):
92 def __init__(self, req, ui):
53 self.req = req
93 self.req = req
54 self.response = ''
94 self.response = ''
55 self.ui = ui
95 self.ui = ui
56 self.name = 'http'
96 self.name = 'http'
57
97
58 def getargs(self, args):
98 def getargs(self, args):
59 knownargs = self._args()
99 knownargs = self._args()
60 data = {}
100 data = {}
61 keys = args.split()
101 keys = args.split()
62 for k in keys:
102 for k in keys:
63 if k == '*':
103 if k == '*':
64 star = {}
104 star = {}
65 for key in knownargs.keys():
105 for key in knownargs.keys():
66 if key != 'cmd' and key not in keys:
106 if key != 'cmd' and key not in keys:
67 star[key] = knownargs[key][0]
107 star[key] = knownargs[key][0]
68 data['*'] = star
108 data['*'] = star
69 else:
109 else:
70 data[k] = knownargs[k][0]
110 data[k] = knownargs[k][0]
71 return [data[k] for k in keys]
111 return [data[k] for k in keys]
72 def _args(self):
112 def _args(self):
73 args = self.req.form.copy()
113 args = self.req.form.copy()
74 if pycompat.ispy3:
114 if pycompat.ispy3:
75 args = {k.encode('ascii'): [v.encode('ascii') for v in vs]
115 args = {k.encode('ascii'): [v.encode('ascii') for v in vs]
76 for k, vs in args.items()}
116 for k, vs in args.items()}
77 postlen = int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
117 postlen = int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
78 if postlen:
118 if postlen:
79 args.update(cgi.parse_qs(
119 args.update(cgi.parse_qs(
80 self.req.read(postlen), keep_blank_values=True))
120 self.req.read(postlen), keep_blank_values=True))
81 return args
121 return args
82
122
83 argvalue = decodevaluefromheaders(self.req, r'X-HgArg')
123 argvalue = decodevaluefromheaders(self.req, r'X-HgArg')
84 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
124 args.update(cgi.parse_qs(argvalue, keep_blank_values=True))
85 return args
125 return args
86 def getfile(self, fp):
126 def getfile(self, fp):
87 length = int(self.req.env[r'CONTENT_LENGTH'])
127 length = int(self.req.env[r'CONTENT_LENGTH'])
88 # If httppostargs is used, we need to read Content-Length
128 # If httppostargs is used, we need to read Content-Length
89 # minus the amount that was consumed by args.
129 # minus the amount that was consumed by args.
90 length -= int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
130 length -= int(self.req.env.get(r'HTTP_X_HGARGS_POST', 0))
91 for s in util.filechunkiter(self.req, limit=length):
131 for s in util.filechunkiter(self.req, limit=length):
92 fp.write(s)
132 fp.write(s)
93 def redirect(self):
133 def redirect(self):
94 self.oldio = self.ui.fout, self.ui.ferr
134 self.oldio = self.ui.fout, self.ui.ferr
95 self.ui.ferr = self.ui.fout = stringio()
135 self.ui.ferr = self.ui.fout = stringio()
96 def restore(self):
136 def restore(self):
97 val = self.ui.fout.getvalue()
137 val = self.ui.fout.getvalue()
98 self.ui.ferr, self.ui.fout = self.oldio
138 self.ui.ferr, self.ui.fout = self.oldio
99 return val
139 return val
100
140
101 def _client(self):
141 def _client(self):
102 return 'remote:%s:%s:%s' % (
142 return 'remote:%s:%s:%s' % (
103 self.req.env.get('wsgi.url_scheme') or 'http',
143 self.req.env.get('wsgi.url_scheme') or 'http',
104 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
144 urlreq.quote(self.req.env.get('REMOTE_HOST', '')),
105 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
145 urlreq.quote(self.req.env.get('REMOTE_USER', '')))
106
146
107 def responsetype(self, prefer_uncompressed):
147 def responsetype(self, prefer_uncompressed):
108 """Determine the appropriate response type and compression settings.
148 """Determine the appropriate response type and compression settings.
109
149
110 Returns a tuple of (mediatype, compengine, engineopts).
150 Returns a tuple of (mediatype, compengine, engineopts).
111 """
151 """
112 # Determine the response media type and compression engine based
152 # Determine the response media type and compression engine based
113 # on the request parameters.
153 # on the request parameters.
114 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
154 protocaps = decodevaluefromheaders(self.req, r'X-HgProto').split(' ')
115
155
116 if '0.2' in protocaps:
156 if '0.2' in protocaps:
117 # All clients are expected to support uncompressed data.
157 # All clients are expected to support uncompressed data.
118 if prefer_uncompressed:
158 if prefer_uncompressed:
119 return HGTYPE2, util._noopengine(), {}
159 return HGTYPE2, util._noopengine(), {}
120
160
121 # Default as defined by wire protocol spec.
161 # Default as defined by wire protocol spec.
122 compformats = ['zlib', 'none']
162 compformats = ['zlib', 'none']
123 for cap in protocaps:
163 for cap in protocaps:
124 if cap.startswith('comp='):
164 if cap.startswith('comp='):
125 compformats = cap[5:].split(',')
165 compformats = cap[5:].split(',')
126 break
166 break
127
167
128 # Now find an agreed upon compression format.
168 # Now find an agreed upon compression format.
129 for engine in wireproto.supportedcompengines(self.ui, self,
169 for engine in wireproto.supportedcompengines(self.ui, self,
130 util.SERVERROLE):
170 util.SERVERROLE):
131 if engine.wireprotosupport().name in compformats:
171 if engine.wireprotosupport().name in compformats:
132 opts = {}
172 opts = {}
133 level = self.ui.configint('server',
173 level = self.ui.configint('server',
134 '%slevel' % engine.name())
174 '%slevel' % engine.name())
135 if level is not None:
175 if level is not None:
136 opts['level'] = level
176 opts['level'] = level
137
177
138 return HGTYPE2, engine, opts
178 return HGTYPE2, engine, opts
139
179
140 # No mutually supported compression format. Fall back to the
180 # No mutually supported compression format. Fall back to the
141 # legacy protocol.
181 # legacy protocol.
142
182
143 # Don't allow untrusted settings because disabling compression or
183 # Don't allow untrusted settings because disabling compression or
144 # setting a very high compression level could lead to flooding
184 # setting a very high compression level could lead to flooding
145 # the server's network or CPU.
185 # the server's network or CPU.
146 opts = {'level': self.ui.configint('server', 'zliblevel')}
186 opts = {'level': self.ui.configint('server', 'zliblevel')}
147 return HGTYPE, util.compengines['zlib'], opts
187 return HGTYPE, util.compengines['zlib'], opts
148
188
149 def iscmd(cmd):
189 def iscmd(cmd):
150 return cmd in wireproto.commands
190 return cmd in wireproto.commands
151
191
152 def callhttp(repo, req, cmd):
192 def callhttp(repo, req, cmd):
153 p = webproto(req, repo.ui)
193 p = webproto(req, repo.ui)
154
194
155 def genversion2(gen, engine, engineopts):
195 def genversion2(gen, engine, engineopts):
156 # application/mercurial-0.2 always sends a payload header
196 # application/mercurial-0.2 always sends a payload header
157 # identifying the compression engine.
197 # identifying the compression engine.
158 name = engine.wireprotosupport().name
198 name = engine.wireprotosupport().name
159 assert 0 < len(name) < 256
199 assert 0 < len(name) < 256
160 yield struct.pack('B', len(name))
200 yield struct.pack('B', len(name))
161 yield name
201 yield name
162
202
163 for chunk in gen:
203 for chunk in gen:
164 yield chunk
204 yield chunk
165
205
166 rsp = wireproto.dispatch(repo, p, cmd)
206 rsp = wireproto.dispatch(repo, p, cmd)
167 if isinstance(rsp, bytes):
207 if isinstance(rsp, bytes):
168 req.respond(HTTP_OK, HGTYPE, body=rsp)
208 req.respond(HTTP_OK, HGTYPE, body=rsp)
169 return []
209 return []
170 elif isinstance(rsp, wireproto.streamres_legacy):
210 elif isinstance(rsp, wireproto.streamres_legacy):
171 gen = rsp.gen
211 gen = rsp.gen
172 req.respond(HTTP_OK, HGTYPE)
212 req.respond(HTTP_OK, HGTYPE)
173 return gen
213 return gen
174 elif isinstance(rsp, wireproto.streamres):
214 elif isinstance(rsp, wireproto.streamres):
175 gen = rsp.gen
215 gen = rsp.gen
176
216
177 # This code for compression should not be streamres specific. It
217 # This code for compression should not be streamres specific. It
178 # is here because we only compress streamres at the moment.
218 # is here because we only compress streamres at the moment.
179 mediatype, engine, engineopts = p.responsetype(rsp.prefer_uncompressed)
219 mediatype, engine, engineopts = p.responsetype(rsp.prefer_uncompressed)
180 gen = engine.compressstream(gen, engineopts)
220 gen = engine.compressstream(gen, engineopts)
181
221
182 if mediatype == HGTYPE2:
222 if mediatype == HGTYPE2:
183 gen = genversion2(gen, engine, engineopts)
223 gen = genversion2(gen, engine, engineopts)
184
224
185 req.respond(HTTP_OK, mediatype)
225 req.respond(HTTP_OK, mediatype)
186 return gen
226 return gen
187 elif isinstance(rsp, wireproto.pushres):
227 elif isinstance(rsp, wireproto.pushres):
188 val = p.restore()
228 val = p.restore()
189 rsp = '%d\n%s' % (rsp.res, val)
229 rsp = '%d\n%s' % (rsp.res, val)
190 req.respond(HTTP_OK, HGTYPE, body=rsp)
230 req.respond(HTTP_OK, HGTYPE, body=rsp)
191 return []
231 return []
192 elif isinstance(rsp, wireproto.pusherr):
232 elif isinstance(rsp, wireproto.pusherr):
193 # drain the incoming bundle
233 # drain the incoming bundle
194 req.drain()
234 req.drain()
195 p.restore()
235 p.restore()
196 rsp = '0\n%s\n' % rsp.res
236 rsp = '0\n%s\n' % rsp.res
197 req.respond(HTTP_OK, HGTYPE, body=rsp)
237 req.respond(HTTP_OK, HGTYPE, body=rsp)
198 return []
238 return []
199 elif isinstance(rsp, wireproto.ooberror):
239 elif isinstance(rsp, wireproto.ooberror):
200 rsp = rsp.message
240 rsp = rsp.message
201 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
241 req.respond(HTTP_OK, HGERRTYPE, body=rsp)
202 return []
242 return []
203 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
243 raise error.ProgrammingError('hgweb.protocol internal failure', rsp)
204
244
205 class sshserver(wireproto.abstractserverproto):
245 class sshserver(abstractserverproto):
206 def __init__(self, ui, repo):
246 def __init__(self, ui, repo):
207 self.ui = ui
247 self.ui = ui
208 self.repo = repo
248 self.repo = repo
209 self.lock = None
249 self.lock = None
210 self.fin = ui.fin
250 self.fin = ui.fin
211 self.fout = ui.fout
251 self.fout = ui.fout
212 self.name = 'ssh'
252 self.name = 'ssh'
213
253
214 hook.redirect(True)
254 hook.redirect(True)
215 ui.fout = repo.ui.fout = ui.ferr
255 ui.fout = repo.ui.fout = ui.ferr
216
256
217 # Prevent insertion/deletion of CRs
257 # Prevent insertion/deletion of CRs
218 util.setbinary(self.fin)
258 util.setbinary(self.fin)
219 util.setbinary(self.fout)
259 util.setbinary(self.fout)
220
260
221 def getargs(self, args):
261 def getargs(self, args):
222 data = {}
262 data = {}
223 keys = args.split()
263 keys = args.split()
224 for n in xrange(len(keys)):
264 for n in xrange(len(keys)):
225 argline = self.fin.readline()[:-1]
265 argline = self.fin.readline()[:-1]
226 arg, l = argline.split()
266 arg, l = argline.split()
227 if arg not in keys:
267 if arg not in keys:
228 raise error.Abort(_("unexpected parameter %r") % arg)
268 raise error.Abort(_("unexpected parameter %r") % arg)
229 if arg == '*':
269 if arg == '*':
230 star = {}
270 star = {}
231 for k in xrange(int(l)):
271 for k in xrange(int(l)):
232 argline = self.fin.readline()[:-1]
272 argline = self.fin.readline()[:-1]
233 arg, l = argline.split()
273 arg, l = argline.split()
234 val = self.fin.read(int(l))
274 val = self.fin.read(int(l))
235 star[arg] = val
275 star[arg] = val
236 data['*'] = star
276 data['*'] = star
237 else:
277 else:
238 val = self.fin.read(int(l))
278 val = self.fin.read(int(l))
239 data[arg] = val
279 data[arg] = val
240 return [data[k] for k in keys]
280 return [data[k] for k in keys]
241
281
242 def getarg(self, name):
282 def getarg(self, name):
243 return self.getargs(name)[0]
283 return self.getargs(name)[0]
244
284
245 def getfile(self, fpout):
285 def getfile(self, fpout):
246 self.sendresponse('')
286 self.sendresponse('')
247 count = int(self.fin.readline())
287 count = int(self.fin.readline())
248 while count:
288 while count:
249 fpout.write(self.fin.read(count))
289 fpout.write(self.fin.read(count))
250 count = int(self.fin.readline())
290 count = int(self.fin.readline())
251
291
252 def redirect(self):
292 def redirect(self):
253 pass
293 pass
254
294
255 def sendresponse(self, v):
295 def sendresponse(self, v):
256 self.fout.write("%d\n" % len(v))
296 self.fout.write("%d\n" % len(v))
257 self.fout.write(v)
297 self.fout.write(v)
258 self.fout.flush()
298 self.fout.flush()
259
299
260 def sendstream(self, source):
300 def sendstream(self, source):
261 write = self.fout.write
301 write = self.fout.write
262 for chunk in source.gen:
302 for chunk in source.gen:
263 write(chunk)
303 write(chunk)
264 self.fout.flush()
304 self.fout.flush()
265
305
266 def sendpushresponse(self, rsp):
306 def sendpushresponse(self, rsp):
267 self.sendresponse('')
307 self.sendresponse('')
268 self.sendresponse(str(rsp.res))
308 self.sendresponse(str(rsp.res))
269
309
270 def sendpusherror(self, rsp):
310 def sendpusherror(self, rsp):
271 self.sendresponse(rsp.res)
311 self.sendresponse(rsp.res)
272
312
273 def sendooberror(self, rsp):
313 def sendooberror(self, rsp):
274 self.ui.ferr.write('%s\n-\n' % rsp.message)
314 self.ui.ferr.write('%s\n-\n' % rsp.message)
275 self.ui.ferr.flush()
315 self.ui.ferr.flush()
276 self.fout.write('\n')
316 self.fout.write('\n')
277 self.fout.flush()
317 self.fout.flush()
278
318
279 def serve_forever(self):
319 def serve_forever(self):
280 try:
320 try:
281 while self.serve_one():
321 while self.serve_one():
282 pass
322 pass
283 finally:
323 finally:
284 if self.lock is not None:
324 if self.lock is not None:
285 self.lock.release()
325 self.lock.release()
286 sys.exit(0)
326 sys.exit(0)
287
327
288 handlers = {
328 handlers = {
289 str: sendresponse,
329 str: sendresponse,
290 wireproto.streamres: sendstream,
330 wireproto.streamres: sendstream,
291 wireproto.streamres_legacy: sendstream,
331 wireproto.streamres_legacy: sendstream,
292 wireproto.pushres: sendpushresponse,
332 wireproto.pushres: sendpushresponse,
293 wireproto.pusherr: sendpusherror,
333 wireproto.pusherr: sendpusherror,
294 wireproto.ooberror: sendooberror,
334 wireproto.ooberror: sendooberror,
295 }
335 }
296
336
297 def serve_one(self):
337 def serve_one(self):
298 cmd = self.fin.readline()[:-1]
338 cmd = self.fin.readline()[:-1]
299 if cmd and cmd in wireproto.commands:
339 if cmd and cmd in wireproto.commands:
300 rsp = wireproto.dispatch(self.repo, self, cmd)
340 rsp = wireproto.dispatch(self.repo, self, cmd)
301 self.handlers[rsp.__class__](self, rsp)
341 self.handlers[rsp.__class__](self, rsp)
302 elif cmd:
342 elif cmd:
303 impl = getattr(self, 'do_' + cmd, None)
343 impl = getattr(self, 'do_' + cmd, None)
304 if impl:
344 if impl:
305 r = impl()
345 r = impl()
306 if r is not None:
346 if r is not None:
307 self.sendresponse(r)
347 self.sendresponse(r)
308 else:
348 else:
309 self.sendresponse("")
349 self.sendresponse("")
310 return cmd != ''
350 return cmd != ''
311
351
312 def _client(self):
352 def _client(self):
313 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
353 client = encoding.environ.get('SSH_CLIENT', '').split(' ', 1)[0]
314 return 'remote:ssh:' + client
354 return 'remote:ssh:' + client
General Comments 0
You need to be logged in to leave comments. Login now