##// END OF EJS Templates
wireproto: remove unused code...
Jun Wu -
r31073:2cf1e520 default
parent child Browse files
Show More
@@ -1,1051 +1,1050
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
11 import itertools
12 import os
12 import os
13 import tempfile
13 import tempfile
14
14
15 from .i18n import _
15 from .i18n import _
16 from .node import (
16 from .node import (
17 bin,
17 bin,
18 hex,
18 hex,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 encoding,
24 encoding,
25 error,
25 error,
26 exchange,
26 exchange,
27 peer,
27 peer,
28 pushkey as pushkeymod,
28 pushkey as pushkeymod,
29 pycompat,
29 pycompat,
30 streamclone,
30 streamclone,
31 util,
31 util,
32 )
32 )
33
33
34 urlerr = util.urlerr
34 urlerr = util.urlerr
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
37 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
38 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
38 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
39 'IncompatibleClient')
39 'IncompatibleClient')
40 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
40 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
41
41
42 class abstractserverproto(object):
42 class abstractserverproto(object):
43 """abstract class that summarizes the protocol API
43 """abstract class that summarizes the protocol API
44
44
45 Used as reference and documentation.
45 Used as reference and documentation.
46 """
46 """
47
47
48 def getargs(self, args):
48 def getargs(self, args):
49 """return the value for arguments in <args>
49 """return the value for arguments in <args>
50
50
51 returns a list of values (same order as <args>)"""
51 returns a list of values (same order as <args>)"""
52 raise NotImplementedError()
52 raise NotImplementedError()
53
53
54 def getfile(self, fp):
54 def getfile(self, fp):
55 """write the whole content of a file into a file like object
55 """write the whole content of a file into a file like object
56
56
57 The file is in the form::
57 The file is in the form::
58
58
59 (<chunk-size>\n<chunk>)+0\n
59 (<chunk-size>\n<chunk>)+0\n
60
60
61 chunk size is the ascii version of the int.
61 chunk size is the ascii version of the int.
62 """
62 """
63 raise NotImplementedError()
63 raise NotImplementedError()
64
64
65 def redirect(self):
65 def redirect(self):
66 """may setup interception for stdout and stderr
66 """may setup interception for stdout and stderr
67
67
68 See also the `restore` method."""
68 See also the `restore` method."""
69 raise NotImplementedError()
69 raise NotImplementedError()
70
70
71 # If the `redirect` function does install interception, the `restore`
71 # If the `redirect` function does install interception, the `restore`
72 # function MUST be defined. If interception is not used, this function
72 # function MUST be defined. If interception is not used, this function
73 # MUST NOT be defined.
73 # MUST NOT be defined.
74 #
74 #
75 # left commented here on purpose
75 # left commented here on purpose
76 #
76 #
77 #def restore(self):
77 #def restore(self):
78 # """reinstall previous stdout and stderr and return intercepted stdout
78 # """reinstall previous stdout and stderr and return intercepted stdout
79 # """
79 # """
80 # raise NotImplementedError()
80 # raise NotImplementedError()
81
81
82 class remotebatch(peer.batcher):
82 class remotebatch(peer.batcher):
83 '''batches the queued calls; uses as few roundtrips as possible'''
83 '''batches the queued calls; uses as few roundtrips as possible'''
84 def __init__(self, remote):
84 def __init__(self, remote):
85 '''remote must support _submitbatch(encbatch) and
85 '''remote must support _submitbatch(encbatch) and
86 _submitone(op, encargs)'''
86 _submitone(op, encargs)'''
87 peer.batcher.__init__(self)
87 peer.batcher.__init__(self)
88 self.remote = remote
88 self.remote = remote
89 def submit(self):
89 def submit(self):
90 req, rsp = [], []
90 req, rsp = [], []
91 for name, args, opts, resref in self.calls:
91 for name, args, opts, resref in self.calls:
92 mtd = getattr(self.remote, name)
92 mtd = getattr(self.remote, name)
93 batchablefn = getattr(mtd, 'batchable', None)
93 batchablefn = getattr(mtd, 'batchable', None)
94 if batchablefn is not None:
94 if batchablefn is not None:
95 batchable = batchablefn(mtd.im_self, *args, **opts)
95 batchable = batchablefn(mtd.im_self, *args, **opts)
96 encargsorres, encresref = next(batchable)
96 encargsorres, encresref = next(batchable)
97 if encresref:
97 if encresref:
98 req.append((name, encargsorres,))
98 req.append((name, encargsorres,))
99 rsp.append((batchable, encresref, resref,))
99 rsp.append((batchable, encresref, resref,))
100 else:
100 else:
101 resref.set(encargsorres)
101 resref.set(encargsorres)
102 else:
102 else:
103 if req:
103 if req:
104 self._submitreq(req, rsp)
104 self._submitreq(req, rsp)
105 req, rsp = [], []
105 req, rsp = [], []
106 resref.set(mtd(*args, **opts))
106 resref.set(mtd(*args, **opts))
107 if req:
107 if req:
108 self._submitreq(req, rsp)
108 self._submitreq(req, rsp)
109 def _submitreq(self, req, rsp):
109 def _submitreq(self, req, rsp):
110 encresults = self.remote._submitbatch(req)
110 encresults = self.remote._submitbatch(req)
111 for encres, r in zip(encresults, rsp):
111 for encres, r in zip(encresults, rsp):
112 batchable, encresref, resref = r
112 batchable, encresref, resref = r
113 encresref.set(encres)
113 encresref.set(encres)
114 resref.set(next(batchable))
114 resref.set(next(batchable))
115
115
116 class remoteiterbatcher(peer.iterbatcher):
116 class remoteiterbatcher(peer.iterbatcher):
117 def __init__(self, remote):
117 def __init__(self, remote):
118 super(remoteiterbatcher, self).__init__()
118 super(remoteiterbatcher, self).__init__()
119 self._remote = remote
119 self._remote = remote
120
120
121 def __getattr__(self, name):
121 def __getattr__(self, name):
122 if not getattr(self._remote, name, False):
122 if not getattr(self._remote, name, False):
123 raise AttributeError(
123 raise AttributeError(
124 'Attempted to iterbatch non-batchable call to %r' % name)
124 'Attempted to iterbatch non-batchable call to %r' % name)
125 return super(remoteiterbatcher, self).__getattr__(name)
125 return super(remoteiterbatcher, self).__getattr__(name)
126
126
127 def submit(self):
127 def submit(self):
128 """Break the batch request into many patch calls and pipeline them.
128 """Break the batch request into many patch calls and pipeline them.
129
129
130 This is mostly valuable over http where request sizes can be
130 This is mostly valuable over http where request sizes can be
131 limited, but can be used in other places as well.
131 limited, but can be used in other places as well.
132 """
132 """
133 req, rsp = [], []
133 req, rsp = [], []
134 for name, args, opts, resref in self.calls:
134 for name, args, opts, resref in self.calls:
135 mtd = getattr(self._remote, name)
135 mtd = getattr(self._remote, name)
136 batchable = mtd.batchable(mtd.im_self, *args, **opts)
136 batchable = mtd.batchable(mtd.im_self, *args, **opts)
137 encargsorres, encresref = next(batchable)
137 encargsorres, encresref = next(batchable)
138 assert encresref
138 assert encresref
139 req.append((name, encargsorres))
139 req.append((name, encargsorres))
140 rsp.append((batchable, encresref))
140 rsp.append((batchable, encresref))
141 if req:
141 if req:
142 self._resultiter = self._remote._submitbatch(req)
142 self._resultiter = self._remote._submitbatch(req)
143 self._rsp = rsp
143 self._rsp = rsp
144
144
145 def results(self):
145 def results(self):
146 for (batchable, encresref), encres in itertools.izip(
146 for (batchable, encresref), encres in itertools.izip(
147 self._rsp, self._resultiter):
147 self._rsp, self._resultiter):
148 encresref.set(encres)
148 encresref.set(encres)
149 yield next(batchable)
149 yield next(batchable)
150
150
151 # Forward a couple of names from peer to make wireproto interactions
151 # Forward a couple of names from peer to make wireproto interactions
152 # slightly more sensible.
152 # slightly more sensible.
153 batchable = peer.batchable
153 batchable = peer.batchable
154 future = peer.future
154 future = peer.future
155
155
156 # list of nodes encoding / decoding
156 # list of nodes encoding / decoding
157
157
158 def decodelist(l, sep=' '):
158 def decodelist(l, sep=' '):
159 if l:
159 if l:
160 return map(bin, l.split(sep))
160 return map(bin, l.split(sep))
161 return []
161 return []
162
162
163 def encodelist(l, sep=' '):
163 def encodelist(l, sep=' '):
164 try:
164 try:
165 return sep.join(map(hex, l))
165 return sep.join(map(hex, l))
166 except TypeError:
166 except TypeError:
167 raise
167 raise
168
168
169 # batched call argument encoding
169 # batched call argument encoding
170
170
171 def escapearg(plain):
171 def escapearg(plain):
172 return (plain
172 return (plain
173 .replace(':', ':c')
173 .replace(':', ':c')
174 .replace(',', ':o')
174 .replace(',', ':o')
175 .replace(';', ':s')
175 .replace(';', ':s')
176 .replace('=', ':e'))
176 .replace('=', ':e'))
177
177
178 def unescapearg(escaped):
178 def unescapearg(escaped):
179 return (escaped
179 return (escaped
180 .replace(':e', '=')
180 .replace(':e', '=')
181 .replace(':s', ';')
181 .replace(':s', ';')
182 .replace(':o', ',')
182 .replace(':o', ',')
183 .replace(':c', ':'))
183 .replace(':c', ':'))
184
184
185 def encodebatchcmds(req):
185 def encodebatchcmds(req):
186 """Return a ``cmds`` argument value for the ``batch`` command."""
186 """Return a ``cmds`` argument value for the ``batch`` command."""
187 cmds = []
187 cmds = []
188 for op, argsdict in req:
188 for op, argsdict in req:
189 # Old servers didn't properly unescape argument names. So prevent
189 # Old servers didn't properly unescape argument names. So prevent
190 # the sending of argument names that may not be decoded properly by
190 # the sending of argument names that may not be decoded properly by
191 # servers.
191 # servers.
192 assert all(escapearg(k) == k for k in argsdict)
192 assert all(escapearg(k) == k for k in argsdict)
193
193
194 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
194 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
195 for k, v in argsdict.iteritems())
195 for k, v in argsdict.iteritems())
196 cmds.append('%s %s' % (op, args))
196 cmds.append('%s %s' % (op, args))
197
197
198 return ';'.join(cmds)
198 return ';'.join(cmds)
199
199
200 # mapping of options accepted by getbundle and their types
200 # mapping of options accepted by getbundle and their types
201 #
201 #
202 # Meant to be extended by extensions. It is extensions responsibility to ensure
202 # Meant to be extended by extensions. It is extensions responsibility to ensure
203 # such options are properly processed in exchange.getbundle.
203 # such options are properly processed in exchange.getbundle.
204 #
204 #
205 # supported types are:
205 # supported types are:
206 #
206 #
207 # :nodes: list of binary nodes
207 # :nodes: list of binary nodes
208 # :csv: list of comma-separated values
208 # :csv: list of comma-separated values
209 # :scsv: list of comma-separated values return as set
209 # :scsv: list of comma-separated values return as set
210 # :plain: string with no transformation needed.
210 # :plain: string with no transformation needed.
211 gboptsmap = {'heads': 'nodes',
211 gboptsmap = {'heads': 'nodes',
212 'common': 'nodes',
212 'common': 'nodes',
213 'obsmarkers': 'boolean',
213 'obsmarkers': 'boolean',
214 'bundlecaps': 'scsv',
214 'bundlecaps': 'scsv',
215 'listkeys': 'csv',
215 'listkeys': 'csv',
216 'cg': 'boolean',
216 'cg': 'boolean',
217 'cbattempted': 'boolean'}
217 'cbattempted': 'boolean'}
218
218
219 # client side
219 # client side
220
220
221 class wirepeer(peer.peerrepository):
221 class wirepeer(peer.peerrepository):
222 """Client-side interface for communicating with a peer repository.
222 """Client-side interface for communicating with a peer repository.
223
223
224 Methods commonly call wire protocol commands of the same name.
224 Methods commonly call wire protocol commands of the same name.
225
225
226 See also httppeer.py and sshpeer.py for protocol-specific
226 See also httppeer.py and sshpeer.py for protocol-specific
227 implementations of this interface.
227 implementations of this interface.
228 """
228 """
229 def batch(self):
229 def batch(self):
230 if self.capable('batch'):
230 if self.capable('batch'):
231 return remotebatch(self)
231 return remotebatch(self)
232 else:
232 else:
233 return peer.localbatch(self)
233 return peer.localbatch(self)
234 def _submitbatch(self, req):
234 def _submitbatch(self, req):
235 """run batch request <req> on the server
235 """run batch request <req> on the server
236
236
237 Returns an iterator of the raw responses from the server.
237 Returns an iterator of the raw responses from the server.
238 """
238 """
239 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
239 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
240 chunk = rsp.read(1024)
240 chunk = rsp.read(1024)
241 work = [chunk]
241 work = [chunk]
242 while chunk:
242 while chunk:
243 while ';' not in chunk and chunk:
243 while ';' not in chunk and chunk:
244 chunk = rsp.read(1024)
244 chunk = rsp.read(1024)
245 work.append(chunk)
245 work.append(chunk)
246 merged = ''.join(work)
246 merged = ''.join(work)
247 while ';' in merged:
247 while ';' in merged:
248 one, merged = merged.split(';', 1)
248 one, merged = merged.split(';', 1)
249 yield unescapearg(one)
249 yield unescapearg(one)
250 chunk = rsp.read(1024)
250 chunk = rsp.read(1024)
251 work = [merged, chunk]
251 work = [merged, chunk]
252 yield unescapearg(''.join(work))
252 yield unescapearg(''.join(work))
253
253
254 def _submitone(self, op, args):
254 def _submitone(self, op, args):
255 return self._call(op, **args)
255 return self._call(op, **args)
256
256
257 def iterbatch(self):
257 def iterbatch(self):
258 return remoteiterbatcher(self)
258 return remoteiterbatcher(self)
259
259
260 @batchable
260 @batchable
261 def lookup(self, key):
261 def lookup(self, key):
262 self.requirecap('lookup', _('look up remote revision'))
262 self.requirecap('lookup', _('look up remote revision'))
263 f = future()
263 f = future()
264 yield {'key': encoding.fromlocal(key)}, f
264 yield {'key': encoding.fromlocal(key)}, f
265 d = f.value
265 d = f.value
266 success, data = d[:-1].split(" ", 1)
266 success, data = d[:-1].split(" ", 1)
267 if int(success):
267 if int(success):
268 yield bin(data)
268 yield bin(data)
269 self._abort(error.RepoError(data))
269 self._abort(error.RepoError(data))
270
270
271 @batchable
271 @batchable
272 def heads(self):
272 def heads(self):
273 f = future()
273 f = future()
274 yield {}, f
274 yield {}, f
275 d = f.value
275 d = f.value
276 try:
276 try:
277 yield decodelist(d[:-1])
277 yield decodelist(d[:-1])
278 except ValueError:
278 except ValueError:
279 self._abort(error.ResponseError(_("unexpected response:"), d))
279 self._abort(error.ResponseError(_("unexpected response:"), d))
280
280
281 @batchable
281 @batchable
282 def known(self, nodes):
282 def known(self, nodes):
283 f = future()
283 f = future()
284 yield {'nodes': encodelist(nodes)}, f
284 yield {'nodes': encodelist(nodes)}, f
285 d = f.value
285 d = f.value
286 try:
286 try:
287 yield [bool(int(b)) for b in d]
287 yield [bool(int(b)) for b in d]
288 except ValueError:
288 except ValueError:
289 self._abort(error.ResponseError(_("unexpected response:"), d))
289 self._abort(error.ResponseError(_("unexpected response:"), d))
290
290
291 @batchable
291 @batchable
292 def branchmap(self):
292 def branchmap(self):
293 f = future()
293 f = future()
294 yield {}, f
294 yield {}, f
295 d = f.value
295 d = f.value
296 try:
296 try:
297 branchmap = {}
297 branchmap = {}
298 for branchpart in d.splitlines():
298 for branchpart in d.splitlines():
299 branchname, branchheads = branchpart.split(' ', 1)
299 branchname, branchheads = branchpart.split(' ', 1)
300 branchname = encoding.tolocal(urlreq.unquote(branchname))
300 branchname = encoding.tolocal(urlreq.unquote(branchname))
301 branchheads = decodelist(branchheads)
301 branchheads = decodelist(branchheads)
302 branchmap[branchname] = branchheads
302 branchmap[branchname] = branchheads
303 yield branchmap
303 yield branchmap
304 except TypeError:
304 except TypeError:
305 self._abort(error.ResponseError(_("unexpected response:"), d))
305 self._abort(error.ResponseError(_("unexpected response:"), d))
306
306
307 def branches(self, nodes):
307 def branches(self, nodes):
308 n = encodelist(nodes)
308 n = encodelist(nodes)
309 d = self._call("branches", nodes=n)
309 d = self._call("branches", nodes=n)
310 try:
310 try:
311 br = [tuple(decodelist(b)) for b in d.splitlines()]
311 br = [tuple(decodelist(b)) for b in d.splitlines()]
312 return br
312 return br
313 except ValueError:
313 except ValueError:
314 self._abort(error.ResponseError(_("unexpected response:"), d))
314 self._abort(error.ResponseError(_("unexpected response:"), d))
315
315
316 def between(self, pairs):
316 def between(self, pairs):
317 batch = 8 # avoid giant requests
317 batch = 8 # avoid giant requests
318 r = []
318 r = []
319 for i in xrange(0, len(pairs), batch):
319 for i in xrange(0, len(pairs), batch):
320 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
320 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
321 d = self._call("between", pairs=n)
321 d = self._call("between", pairs=n)
322 try:
322 try:
323 r.extend(l and decodelist(l) or [] for l in d.splitlines())
323 r.extend(l and decodelist(l) or [] for l in d.splitlines())
324 except ValueError:
324 except ValueError:
325 self._abort(error.ResponseError(_("unexpected response:"), d))
325 self._abort(error.ResponseError(_("unexpected response:"), d))
326 return r
326 return r
327
327
328 @batchable
328 @batchable
329 def pushkey(self, namespace, key, old, new):
329 def pushkey(self, namespace, key, old, new):
330 if not self.capable('pushkey'):
330 if not self.capable('pushkey'):
331 yield False, None
331 yield False, None
332 f = future()
332 f = future()
333 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
333 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
334 yield {'namespace': encoding.fromlocal(namespace),
334 yield {'namespace': encoding.fromlocal(namespace),
335 'key': encoding.fromlocal(key),
335 'key': encoding.fromlocal(key),
336 'old': encoding.fromlocal(old),
336 'old': encoding.fromlocal(old),
337 'new': encoding.fromlocal(new)}, f
337 'new': encoding.fromlocal(new)}, f
338 d = f.value
338 d = f.value
339 d, output = d.split('\n', 1)
339 d, output = d.split('\n', 1)
340 try:
340 try:
341 d = bool(int(d))
341 d = bool(int(d))
342 except ValueError:
342 except ValueError:
343 raise error.ResponseError(
343 raise error.ResponseError(
344 _('push failed (unexpected response):'), d)
344 _('push failed (unexpected response):'), d)
345 for l in output.splitlines(True):
345 for l in output.splitlines(True):
346 self.ui.status(_('remote: '), l)
346 self.ui.status(_('remote: '), l)
347 yield d
347 yield d
348
348
349 @batchable
349 @batchable
350 def listkeys(self, namespace):
350 def listkeys(self, namespace):
351 if not self.capable('pushkey'):
351 if not self.capable('pushkey'):
352 yield {}, None
352 yield {}, None
353 f = future()
353 f = future()
354 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
354 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
355 yield {'namespace': encoding.fromlocal(namespace)}, f
355 yield {'namespace': encoding.fromlocal(namespace)}, f
356 d = f.value
356 d = f.value
357 self.ui.debug('received listkey for "%s": %i bytes\n'
357 self.ui.debug('received listkey for "%s": %i bytes\n'
358 % (namespace, len(d)))
358 % (namespace, len(d)))
359 yield pushkeymod.decodekeys(d)
359 yield pushkeymod.decodekeys(d)
360
360
361 def stream_out(self):
361 def stream_out(self):
362 return self._callstream('stream_out')
362 return self._callstream('stream_out')
363
363
364 def changegroup(self, nodes, kind):
364 def changegroup(self, nodes, kind):
365 n = encodelist(nodes)
365 n = encodelist(nodes)
366 f = self._callcompressable("changegroup", roots=n)
366 f = self._callcompressable("changegroup", roots=n)
367 return changegroupmod.cg1unpacker(f, 'UN')
367 return changegroupmod.cg1unpacker(f, 'UN')
368
368
369 def changegroupsubset(self, bases, heads, kind):
369 def changegroupsubset(self, bases, heads, kind):
370 self.requirecap('changegroupsubset', _('look up remote changes'))
370 self.requirecap('changegroupsubset', _('look up remote changes'))
371 bases = encodelist(bases)
371 bases = encodelist(bases)
372 heads = encodelist(heads)
372 heads = encodelist(heads)
373 f = self._callcompressable("changegroupsubset",
373 f = self._callcompressable("changegroupsubset",
374 bases=bases, heads=heads)
374 bases=bases, heads=heads)
375 return changegroupmod.cg1unpacker(f, 'UN')
375 return changegroupmod.cg1unpacker(f, 'UN')
376
376
377 def getbundle(self, source, **kwargs):
377 def getbundle(self, source, **kwargs):
378 self.requirecap('getbundle', _('look up remote changes'))
378 self.requirecap('getbundle', _('look up remote changes'))
379 opts = {}
379 opts = {}
380 bundlecaps = kwargs.get('bundlecaps')
380 bundlecaps = kwargs.get('bundlecaps')
381 if bundlecaps is not None:
381 if bundlecaps is not None:
382 kwargs['bundlecaps'] = sorted(bundlecaps)
382 kwargs['bundlecaps'] = sorted(bundlecaps)
383 else:
383 else:
384 bundlecaps = () # kwargs could have it to None
384 bundlecaps = () # kwargs could have it to None
385 for key, value in kwargs.iteritems():
385 for key, value in kwargs.iteritems():
386 if value is None:
386 if value is None:
387 continue
387 continue
388 keytype = gboptsmap.get(key)
388 keytype = gboptsmap.get(key)
389 if keytype is None:
389 if keytype is None:
390 assert False, 'unexpected'
390 assert False, 'unexpected'
391 elif keytype == 'nodes':
391 elif keytype == 'nodes':
392 value = encodelist(value)
392 value = encodelist(value)
393 elif keytype in ('csv', 'scsv'):
393 elif keytype in ('csv', 'scsv'):
394 value = ','.join(value)
394 value = ','.join(value)
395 elif keytype == 'boolean':
395 elif keytype == 'boolean':
396 value = '%i' % bool(value)
396 value = '%i' % bool(value)
397 elif keytype != 'plain':
397 elif keytype != 'plain':
398 raise KeyError('unknown getbundle option type %s'
398 raise KeyError('unknown getbundle option type %s'
399 % keytype)
399 % keytype)
400 opts[key] = value
400 opts[key] = value
401 f = self._callcompressable("getbundle", **opts)
401 f = self._callcompressable("getbundle", **opts)
402 if any((cap.startswith('HG2') for cap in bundlecaps)):
402 if any((cap.startswith('HG2') for cap in bundlecaps)):
403 return bundle2.getunbundler(self.ui, f)
403 return bundle2.getunbundler(self.ui, f)
404 else:
404 else:
405 return changegroupmod.cg1unpacker(f, 'UN')
405 return changegroupmod.cg1unpacker(f, 'UN')
406
406
407 def unbundle(self, cg, heads, url):
407 def unbundle(self, cg, heads, url):
408 '''Send cg (a readable file-like object representing the
408 '''Send cg (a readable file-like object representing the
409 changegroup to push, typically a chunkbuffer object) to the
409 changegroup to push, typically a chunkbuffer object) to the
410 remote server as a bundle.
410 remote server as a bundle.
411
411
412 When pushing a bundle10 stream, return an integer indicating the
412 When pushing a bundle10 stream, return an integer indicating the
413 result of the push (see localrepository.addchangegroup()).
413 result of the push (see localrepository.addchangegroup()).
414
414
415 When pushing a bundle20 stream, return a bundle20 stream.
415 When pushing a bundle20 stream, return a bundle20 stream.
416
416
417 `url` is the url the client thinks it's pushing to, which is
417 `url` is the url the client thinks it's pushing to, which is
418 visible to hooks.
418 visible to hooks.
419 '''
419 '''
420
420
421 if heads != ['force'] and self.capable('unbundlehash'):
421 if heads != ['force'] and self.capable('unbundlehash'):
422 heads = encodelist(['hashed',
422 heads = encodelist(['hashed',
423 hashlib.sha1(''.join(sorted(heads))).digest()])
423 hashlib.sha1(''.join(sorted(heads))).digest()])
424 else:
424 else:
425 heads = encodelist(heads)
425 heads = encodelist(heads)
426
426
427 if util.safehasattr(cg, 'deltaheader'):
427 if util.safehasattr(cg, 'deltaheader'):
428 # this a bundle10, do the old style call sequence
428 # this a bundle10, do the old style call sequence
429 ret, output = self._callpush("unbundle", cg, heads=heads)
429 ret, output = self._callpush("unbundle", cg, heads=heads)
430 if ret == "":
430 if ret == "":
431 raise error.ResponseError(
431 raise error.ResponseError(
432 _('push failed:'), output)
432 _('push failed:'), output)
433 try:
433 try:
434 ret = int(ret)
434 ret = int(ret)
435 except ValueError:
435 except ValueError:
436 raise error.ResponseError(
436 raise error.ResponseError(
437 _('push failed (unexpected response):'), ret)
437 _('push failed (unexpected response):'), ret)
438
438
439 for l in output.splitlines(True):
439 for l in output.splitlines(True):
440 self.ui.status(_('remote: '), l)
440 self.ui.status(_('remote: '), l)
441 else:
441 else:
442 # bundle2 push. Send a stream, fetch a stream.
442 # bundle2 push. Send a stream, fetch a stream.
443 stream = self._calltwowaystream('unbundle', cg, heads=heads)
443 stream = self._calltwowaystream('unbundle', cg, heads=heads)
444 ret = bundle2.getunbundler(self.ui, stream)
444 ret = bundle2.getunbundler(self.ui, stream)
445 return ret
445 return ret
446
446
447 def debugwireargs(self, one, two, three=None, four=None, five=None):
447 def debugwireargs(self, one, two, three=None, four=None, five=None):
448 # don't pass optional arguments left at their default value
448 # don't pass optional arguments left at their default value
449 opts = {}
449 opts = {}
450 if three is not None:
450 if three is not None:
451 opts['three'] = three
451 opts['three'] = three
452 if four is not None:
452 if four is not None:
453 opts['four'] = four
453 opts['four'] = four
454 return self._call('debugwireargs', one=one, two=two, **opts)
454 return self._call('debugwireargs', one=one, two=two, **opts)
455
455
456 def _call(self, cmd, **args):
456 def _call(self, cmd, **args):
457 """execute <cmd> on the server
457 """execute <cmd> on the server
458
458
459 The command is expected to return a simple string.
459 The command is expected to return a simple string.
460
460
461 returns the server reply as a string."""
461 returns the server reply as a string."""
462 raise NotImplementedError()
462 raise NotImplementedError()
463
463
464 def _callstream(self, cmd, **args):
464 def _callstream(self, cmd, **args):
465 """execute <cmd> on the server
465 """execute <cmd> on the server
466
466
467 The command is expected to return a stream. Note that if the
467 The command is expected to return a stream. Note that if the
468 command doesn't return a stream, _callstream behaves
468 command doesn't return a stream, _callstream behaves
469 differently for ssh and http peers.
469 differently for ssh and http peers.
470
470
471 returns the server reply as a file like object.
471 returns the server reply as a file like object.
472 """
472 """
473 raise NotImplementedError()
473 raise NotImplementedError()
474
474
475 def _callcompressable(self, cmd, **args):
475 def _callcompressable(self, cmd, **args):
476 """execute <cmd> on the server
476 """execute <cmd> on the server
477
477
478 The command is expected to return a stream.
478 The command is expected to return a stream.
479
479
480 The stream may have been compressed in some implementations. This
480 The stream may have been compressed in some implementations. This
481 function takes care of the decompression. This is the only difference
481 function takes care of the decompression. This is the only difference
482 with _callstream.
482 with _callstream.
483
483
484 returns the server reply as a file like object.
484 returns the server reply as a file like object.
485 """
485 """
486 raise NotImplementedError()
486 raise NotImplementedError()
487
487
488 def _callpush(self, cmd, fp, **args):
488 def _callpush(self, cmd, fp, **args):
489 """execute a <cmd> on server
489 """execute a <cmd> on server
490
490
491 The command is expected to be related to a push. Push has a special
491 The command is expected to be related to a push. Push has a special
492 return method.
492 return method.
493
493
494 returns the server reply as a (ret, output) tuple. ret is either
494 returns the server reply as a (ret, output) tuple. ret is either
495 empty (error) or a stringified int.
495 empty (error) or a stringified int.
496 """
496 """
497 raise NotImplementedError()
497 raise NotImplementedError()
498
498
499 def _calltwowaystream(self, cmd, fp, **args):
499 def _calltwowaystream(self, cmd, fp, **args):
500 """execute <cmd> on server
500 """execute <cmd> on server
501
501
502 The command will send a stream to the server and get a stream in reply.
502 The command will send a stream to the server and get a stream in reply.
503 """
503 """
504 raise NotImplementedError()
504 raise NotImplementedError()
505
505
506 def _abort(self, exception):
506 def _abort(self, exception):
507 """clearly abort the wire protocol connection and raise the exception
507 """clearly abort the wire protocol connection and raise the exception
508 """
508 """
509 raise NotImplementedError()
509 raise NotImplementedError()
510
510
511 # server side
511 # server side
512
512
513 # wire protocol command can either return a string or one of these classes.
513 # wire protocol command can either return a string or one of these classes.
514 class streamres(object):
514 class streamres(object):
515 """wireproto reply: binary stream
515 """wireproto reply: binary stream
516
516
517 The call was successful and the result is a stream.
517 The call was successful and the result is a stream.
518
518
519 Accepts either a generator or an object with a ``read(size)`` method.
519 Accepts either a generator or an object with a ``read(size)`` method.
520
520
521 ``v1compressible`` indicates whether this data can be compressed to
521 ``v1compressible`` indicates whether this data can be compressed to
522 "version 1" clients (technically: HTTP peers using
522 "version 1" clients (technically: HTTP peers using
523 application/mercurial-0.1 media type). This flag should NOT be used on
523 application/mercurial-0.1 media type). This flag should NOT be used on
524 new commands because new clients should support a more modern compression
524 new commands because new clients should support a more modern compression
525 mechanism.
525 mechanism.
526 """
526 """
527 def __init__(self, gen=None, reader=None, v1compressible=False):
527 def __init__(self, gen=None, reader=None, v1compressible=False):
528 self.gen = gen
528 self.gen = gen
529 self.reader = reader
529 self.reader = reader
530 self.v1compressible = v1compressible
530 self.v1compressible = v1compressible
531
531
532 class pushres(object):
532 class pushres(object):
533 """wireproto reply: success with simple integer return
533 """wireproto reply: success with simple integer return
534
534
535 The call was successful and returned an integer contained in `self.res`.
535 The call was successful and returned an integer contained in `self.res`.
536 """
536 """
537 def __init__(self, res):
537 def __init__(self, res):
538 self.res = res
538 self.res = res
539
539
540 class pusherr(object):
540 class pusherr(object):
541 """wireproto reply: failure
541 """wireproto reply: failure
542
542
543 The call failed. The `self.res` attribute contains the error message.
543 The call failed. The `self.res` attribute contains the error message.
544 """
544 """
545 def __init__(self, res):
545 def __init__(self, res):
546 self.res = res
546 self.res = res
547
547
548 class ooberror(object):
548 class ooberror(object):
549 """wireproto reply: failure of a batch of operation
549 """wireproto reply: failure of a batch of operation
550
550
551 Something failed during a batch call. The error message is stored in
551 Something failed during a batch call. The error message is stored in
552 `self.message`.
552 `self.message`.
553 """
553 """
554 def __init__(self, message):
554 def __init__(self, message):
555 self.message = message
555 self.message = message
556
556
557 def getdispatchrepo(repo, proto, command):
557 def getdispatchrepo(repo, proto, command):
558 """Obtain the repo used for processing wire protocol commands.
558 """Obtain the repo used for processing wire protocol commands.
559
559
560 The intent of this function is to serve as a monkeypatch point for
560 The intent of this function is to serve as a monkeypatch point for
561 extensions that need commands to operate on different repo views under
561 extensions that need commands to operate on different repo views under
562 specialized circumstances.
562 specialized circumstances.
563 """
563 """
564 return repo.filtered('served')
564 return repo.filtered('served')
565
565
566 def dispatch(repo, proto, command):
566 def dispatch(repo, proto, command):
567 repo = getdispatchrepo(repo, proto, command)
567 repo = getdispatchrepo(repo, proto, command)
568 func, spec = commands[command]
568 func, spec = commands[command]
569 args = proto.getargs(spec)
569 args = proto.getargs(spec)
570 return func(repo, proto, *args)
570 return func(repo, proto, *args)
571
571
572 def options(cmd, keys, others):
572 def options(cmd, keys, others):
573 opts = {}
573 opts = {}
574 for k in keys:
574 for k in keys:
575 if k in others:
575 if k in others:
576 opts[k] = others[k]
576 opts[k] = others[k]
577 del others[k]
577 del others[k]
578 if others:
578 if others:
579 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
579 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
580 % (cmd, ",".join(others)))
580 % (cmd, ",".join(others)))
581 return opts
581 return opts
582
582
583 def bundle1allowed(repo, action):
583 def bundle1allowed(repo, action):
584 """Whether a bundle1 operation is allowed from the server.
584 """Whether a bundle1 operation is allowed from the server.
585
585
586 Priority is:
586 Priority is:
587
587
588 1. server.bundle1gd.<action> (if generaldelta active)
588 1. server.bundle1gd.<action> (if generaldelta active)
589 2. server.bundle1.<action>
589 2. server.bundle1.<action>
590 3. server.bundle1gd (if generaldelta active)
590 3. server.bundle1gd (if generaldelta active)
591 4. server.bundle1
591 4. server.bundle1
592 """
592 """
593 ui = repo.ui
593 ui = repo.ui
594 gd = 'generaldelta' in repo.requirements
594 gd = 'generaldelta' in repo.requirements
595
595
596 if gd:
596 if gd:
597 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
597 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
598 if v is not None:
598 if v is not None:
599 return v
599 return v
600
600
601 v = ui.configbool('server', 'bundle1.%s' % action, None)
601 v = ui.configbool('server', 'bundle1.%s' % action, None)
602 if v is not None:
602 if v is not None:
603 return v
603 return v
604
604
605 if gd:
605 if gd:
606 v = ui.configbool('server', 'bundle1gd', None)
606 v = ui.configbool('server', 'bundle1gd', None)
607 if v is not None:
607 if v is not None:
608 return v
608 return v
609
609
610 return ui.configbool('server', 'bundle1', True)
610 return ui.configbool('server', 'bundle1', True)
611
611
612 def supportedcompengines(ui, proto, role):
612 def supportedcompengines(ui, proto, role):
613 """Obtain the list of supported compression engines for a request."""
613 """Obtain the list of supported compression engines for a request."""
614 assert role in (util.CLIENTROLE, util.SERVERROLE)
614 assert role in (util.CLIENTROLE, util.SERVERROLE)
615
615
616 compengines = util.compengines.supportedwireengines(role)
616 compengines = util.compengines.supportedwireengines(role)
617
617
618 # Allow config to override default list and ordering.
618 # Allow config to override default list and ordering.
619 if role == util.SERVERROLE:
619 if role == util.SERVERROLE:
620 configengines = ui.configlist('server', 'compressionengines')
620 configengines = ui.configlist('server', 'compressionengines')
621 config = 'server.compressionengines'
621 config = 'server.compressionengines'
622 else:
622 else:
623 # This is currently implemented mainly to facilitate testing. In most
623 # This is currently implemented mainly to facilitate testing. In most
624 # cases, the server should be in charge of choosing a compression engine
624 # cases, the server should be in charge of choosing a compression engine
625 # because a server has the most to lose from a sub-optimal choice. (e.g.
625 # because a server has the most to lose from a sub-optimal choice. (e.g.
626 # CPU DoS due to an expensive engine or a network DoS due to poor
626 # CPU DoS due to an expensive engine or a network DoS due to poor
627 # compression ratio).
627 # compression ratio).
628 configengines = ui.configlist('experimental',
628 configengines = ui.configlist('experimental',
629 'clientcompressionengines')
629 'clientcompressionengines')
630 config = 'experimental.clientcompressionengines'
630 config = 'experimental.clientcompressionengines'
631
631
632 # No explicit config. Filter out the ones that aren't supposed to be
632 # No explicit config. Filter out the ones that aren't supposed to be
633 # advertised and return default ordering.
633 # advertised and return default ordering.
634 if not configengines:
634 if not configengines:
635 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
635 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
636 return [e for e in compengines
636 return [e for e in compengines
637 if getattr(e.wireprotosupport(), attr) > 0]
637 if getattr(e.wireprotosupport(), attr) > 0]
638
638
639 # If compression engines are listed in the config, assume there is a good
639 # If compression engines are listed in the config, assume there is a good
640 # reason for it (like server operators wanting to achieve specific
640 # reason for it (like server operators wanting to achieve specific
641 # performance characteristics). So fail fast if the config references
641 # performance characteristics). So fail fast if the config references
642 # unusable compression engines.
642 # unusable compression engines.
643 validnames = set(e.name() for e in compengines)
643 validnames = set(e.name() for e in compengines)
644 invalidnames = set(e for e in configengines if e not in validnames)
644 invalidnames = set(e for e in configengines if e not in validnames)
645 if invalidnames:
645 if invalidnames:
646 raise error.Abort(_('invalid compression engine defined in %s: %s') %
646 raise error.Abort(_('invalid compression engine defined in %s: %s') %
647 (config, ', '.join(sorted(invalidnames))))
647 (config, ', '.join(sorted(invalidnames))))
648
648
649 compengines = [e for e in compengines if e.name() in configengines]
649 compengines = [e for e in compengines if e.name() in configengines]
650 compengines = sorted(compengines,
650 compengines = sorted(compengines,
651 key=lambda e: configengines.index(e.name()))
651 key=lambda e: configengines.index(e.name()))
652
652
653 if not compengines:
653 if not compengines:
654 raise error.Abort(_('%s config option does not specify any known '
654 raise error.Abort(_('%s config option does not specify any known '
655 'compression engines') % config,
655 'compression engines') % config,
656 hint=_('usable compression engines: %s') %
656 hint=_('usable compression engines: %s') %
657 ', '.sorted(validnames))
657 ', '.sorted(validnames))
658
658
659 return compengines
659 return compengines
660
660
661 # list of commands
661 # list of commands
662 commands = {}
662 commands = {}
663
663
664 def wireprotocommand(name, args=''):
664 def wireprotocommand(name, args=''):
665 """decorator for wire protocol command"""
665 """decorator for wire protocol command"""
666 def register(func):
666 def register(func):
667 commands[name] = (func, args)
667 commands[name] = (func, args)
668 return func
668 return func
669 return register
669 return register
670
670
671 @wireprotocommand('batch', 'cmds *')
671 @wireprotocommand('batch', 'cmds *')
672 def batch(repo, proto, cmds, others):
672 def batch(repo, proto, cmds, others):
673 repo = repo.filtered("served")
673 repo = repo.filtered("served")
674 res = []
674 res = []
675 for pair in cmds.split(';'):
675 for pair in cmds.split(';'):
676 op, args = pair.split(' ', 1)
676 op, args = pair.split(' ', 1)
677 vals = {}
677 vals = {}
678 for a in args.split(','):
678 for a in args.split(','):
679 if a:
679 if a:
680 n, v = a.split('=')
680 n, v = a.split('=')
681 vals[unescapearg(n)] = unescapearg(v)
681 vals[unescapearg(n)] = unescapearg(v)
682 func, spec = commands[op]
682 func, spec = commands[op]
683 if spec:
683 if spec:
684 keys = spec.split()
684 keys = spec.split()
685 data = {}
685 data = {}
686 for k in keys:
686 for k in keys:
687 if k == '*':
687 if k == '*':
688 star = {}
688 star = {}
689 for key in vals.keys():
689 for key in vals.keys():
690 if key not in keys:
690 if key not in keys:
691 star[key] = vals[key]
691 star[key] = vals[key]
692 data['*'] = star
692 data['*'] = star
693 else:
693 else:
694 data[k] = vals[k]
694 data[k] = vals[k]
695 result = func(repo, proto, *[data[k] for k in keys])
695 result = func(repo, proto, *[data[k] for k in keys])
696 else:
696 else:
697 result = func(repo, proto)
697 result = func(repo, proto)
698 if isinstance(result, ooberror):
698 if isinstance(result, ooberror):
699 return result
699 return result
700 res.append(escapearg(result))
700 res.append(escapearg(result))
701 return ';'.join(res)
701 return ';'.join(res)
702
702
703 @wireprotocommand('between', 'pairs')
703 @wireprotocommand('between', 'pairs')
704 def between(repo, proto, pairs):
704 def between(repo, proto, pairs):
705 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
705 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
706 r = []
706 r = []
707 for b in repo.between(pairs):
707 for b in repo.between(pairs):
708 r.append(encodelist(b) + "\n")
708 r.append(encodelist(b) + "\n")
709 return "".join(r)
709 return "".join(r)
710
710
711 @wireprotocommand('branchmap')
711 @wireprotocommand('branchmap')
712 def branchmap(repo, proto):
712 def branchmap(repo, proto):
713 branchmap = repo.branchmap()
713 branchmap = repo.branchmap()
714 heads = []
714 heads = []
715 for branch, nodes in branchmap.iteritems():
715 for branch, nodes in branchmap.iteritems():
716 branchname = urlreq.quote(encoding.fromlocal(branch))
716 branchname = urlreq.quote(encoding.fromlocal(branch))
717 branchnodes = encodelist(nodes)
717 branchnodes = encodelist(nodes)
718 heads.append('%s %s' % (branchname, branchnodes))
718 heads.append('%s %s' % (branchname, branchnodes))
719 return '\n'.join(heads)
719 return '\n'.join(heads)
720
720
721 @wireprotocommand('branches', 'nodes')
721 @wireprotocommand('branches', 'nodes')
722 def branches(repo, proto, nodes):
722 def branches(repo, proto, nodes):
723 nodes = decodelist(nodes)
723 nodes = decodelist(nodes)
724 r = []
724 r = []
725 for b in repo.branches(nodes):
725 for b in repo.branches(nodes):
726 r.append(encodelist(b) + "\n")
726 r.append(encodelist(b) + "\n")
727 return "".join(r)
727 return "".join(r)
728
728
729 @wireprotocommand('clonebundles', '')
729 @wireprotocommand('clonebundles', '')
730 def clonebundles(repo, proto):
730 def clonebundles(repo, proto):
731 """Server command for returning info for available bundles to seed clones.
731 """Server command for returning info for available bundles to seed clones.
732
732
733 Clients will parse this response and determine what bundle to fetch.
733 Clients will parse this response and determine what bundle to fetch.
734
734
735 Extensions may wrap this command to filter or dynamically emit data
735 Extensions may wrap this command to filter or dynamically emit data
736 depending on the request. e.g. you could advertise URLs for the closest
736 depending on the request. e.g. you could advertise URLs for the closest
737 data center given the client's IP address.
737 data center given the client's IP address.
738 """
738 """
739 return repo.opener.tryread('clonebundles.manifest')
739 return repo.opener.tryread('clonebundles.manifest')
740
740
741 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
741 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
742 'known', 'getbundle', 'unbundlehash', 'batch']
742 'known', 'getbundle', 'unbundlehash', 'batch']
743
743
744 def _capabilities(repo, proto):
744 def _capabilities(repo, proto):
745 """return a list of capabilities for a repo
745 """return a list of capabilities for a repo
746
746
747 This function exists to allow extensions to easily wrap capabilities
747 This function exists to allow extensions to easily wrap capabilities
748 computation
748 computation
749
749
750 - returns a lists: easy to alter
750 - returns a lists: easy to alter
751 - change done here will be propagated to both `capabilities` and `hello`
751 - change done here will be propagated to both `capabilities` and `hello`
752 command without any other action needed.
752 command without any other action needed.
753 """
753 """
754 # copy to prevent modification of the global list
754 # copy to prevent modification of the global list
755 caps = list(wireprotocaps)
755 caps = list(wireprotocaps)
756 if streamclone.allowservergeneration(repo.ui):
756 if streamclone.allowservergeneration(repo.ui):
757 if repo.ui.configbool('server', 'preferuncompressed', False):
757 if repo.ui.configbool('server', 'preferuncompressed', False):
758 caps.append('stream-preferred')
758 caps.append('stream-preferred')
759 requiredformats = repo.requirements & repo.supportedformats
759 requiredformats = repo.requirements & repo.supportedformats
760 # if our local revlogs are just revlogv1, add 'stream' cap
760 # if our local revlogs are just revlogv1, add 'stream' cap
761 if not requiredformats - set(('revlogv1',)):
761 if not requiredformats - set(('revlogv1',)):
762 caps.append('stream')
762 caps.append('stream')
763 # otherwise, add 'streamreqs' detailing our local revlog format
763 # otherwise, add 'streamreqs' detailing our local revlog format
764 else:
764 else:
765 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
765 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
766 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
766 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
767 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
767 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
768 caps.append('bundle2=' + urlreq.quote(capsblob))
768 caps.append('bundle2=' + urlreq.quote(capsblob))
769 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
769 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
770
770
771 if proto.name == 'http':
771 if proto.name == 'http':
772 caps.append('httpheader=%d' %
772 caps.append('httpheader=%d' %
773 repo.ui.configint('server', 'maxhttpheaderlen', 1024))
773 repo.ui.configint('server', 'maxhttpheaderlen', 1024))
774 if repo.ui.configbool('experimental', 'httppostargs', False):
774 if repo.ui.configbool('experimental', 'httppostargs', False):
775 caps.append('httppostargs')
775 caps.append('httppostargs')
776
776
777 # FUTURE advertise 0.2rx once support is implemented
777 # FUTURE advertise 0.2rx once support is implemented
778 # FUTURE advertise minrx and mintx after consulting config option
778 # FUTURE advertise minrx and mintx after consulting config option
779 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
779 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
780
780
781 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
781 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
782 if compengines:
782 if compengines:
783 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
783 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
784 for e in compengines)
784 for e in compengines)
785 caps.append('compression=%s' % comptypes)
785 caps.append('compression=%s' % comptypes)
786
786
787 return caps
787 return caps
788
788
789 # If you are writing an extension and consider wrapping this function. Wrap
789 # If you are writing an extension and consider wrapping this function. Wrap
790 # `_capabilities` instead.
790 # `_capabilities` instead.
791 @wireprotocommand('capabilities')
791 @wireprotocommand('capabilities')
792 def capabilities(repo, proto):
792 def capabilities(repo, proto):
793 return ' '.join(_capabilities(repo, proto))
793 return ' '.join(_capabilities(repo, proto))
794
794
795 @wireprotocommand('changegroup', 'roots')
795 @wireprotocommand('changegroup', 'roots')
796 def changegroup(repo, proto, roots):
796 def changegroup(repo, proto, roots):
797 nodes = decodelist(roots)
797 nodes = decodelist(roots)
798 cg = changegroupmod.changegroup(repo, nodes, 'serve')
798 cg = changegroupmod.changegroup(repo, nodes, 'serve')
799 return streamres(reader=cg, v1compressible=True)
799 return streamres(reader=cg, v1compressible=True)
800
800
801 @wireprotocommand('changegroupsubset', 'bases heads')
801 @wireprotocommand('changegroupsubset', 'bases heads')
802 def changegroupsubset(repo, proto, bases, heads):
802 def changegroupsubset(repo, proto, bases, heads):
803 bases = decodelist(bases)
803 bases = decodelist(bases)
804 heads = decodelist(heads)
804 heads = decodelist(heads)
805 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
805 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
806 return streamres(reader=cg, v1compressible=True)
806 return streamres(reader=cg, v1compressible=True)
807
807
808 @wireprotocommand('debugwireargs', 'one two *')
808 @wireprotocommand('debugwireargs', 'one two *')
809 def debugwireargs(repo, proto, one, two, others):
809 def debugwireargs(repo, proto, one, two, others):
810 # only accept optional args from the known set
810 # only accept optional args from the known set
811 opts = options('debugwireargs', ['three', 'four'], others)
811 opts = options('debugwireargs', ['three', 'four'], others)
812 return repo.debugwireargs(one, two, **opts)
812 return repo.debugwireargs(one, two, **opts)
813
813
814 @wireprotocommand('getbundle', '*')
814 @wireprotocommand('getbundle', '*')
815 def getbundle(repo, proto, others):
815 def getbundle(repo, proto, others):
816 opts = options('getbundle', gboptsmap.keys(), others)
816 opts = options('getbundle', gboptsmap.keys(), others)
817 for k, v in opts.iteritems():
817 for k, v in opts.iteritems():
818 keytype = gboptsmap[k]
818 keytype = gboptsmap[k]
819 if keytype == 'nodes':
819 if keytype == 'nodes':
820 opts[k] = decodelist(v)
820 opts[k] = decodelist(v)
821 elif keytype == 'csv':
821 elif keytype == 'csv':
822 opts[k] = list(v.split(','))
822 opts[k] = list(v.split(','))
823 elif keytype == 'scsv':
823 elif keytype == 'scsv':
824 opts[k] = set(v.split(','))
824 opts[k] = set(v.split(','))
825 elif keytype == 'boolean':
825 elif keytype == 'boolean':
826 # Client should serialize False as '0', which is a non-empty string
826 # Client should serialize False as '0', which is a non-empty string
827 # so it evaluates as a True bool.
827 # so it evaluates as a True bool.
828 if v == '0':
828 if v == '0':
829 opts[k] = False
829 opts[k] = False
830 else:
830 else:
831 opts[k] = bool(v)
831 opts[k] = bool(v)
832 elif keytype != 'plain':
832 elif keytype != 'plain':
833 raise KeyError('unknown getbundle option type %s'
833 raise KeyError('unknown getbundle option type %s'
834 % keytype)
834 % keytype)
835
835
836 if not bundle1allowed(repo, 'pull'):
836 if not bundle1allowed(repo, 'pull'):
837 if not exchange.bundle2requested(opts.get('bundlecaps')):
837 if not exchange.bundle2requested(opts.get('bundlecaps')):
838 if proto.name == 'http':
838 if proto.name == 'http':
839 return ooberror(bundle2required)
839 return ooberror(bundle2required)
840 raise error.Abort(bundle2requiredmain,
840 raise error.Abort(bundle2requiredmain,
841 hint=bundle2requiredhint)
841 hint=bundle2requiredhint)
842
842
843 #chunks = exchange.getbundlechunks(repo, 'serve', **opts)
844 try:
843 try:
845 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
844 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
846 except error.Abort as exc:
845 except error.Abort as exc:
847 # cleanly forward Abort error to the client
846 # cleanly forward Abort error to the client
848 if not exchange.bundle2requested(opts.get('bundlecaps')):
847 if not exchange.bundle2requested(opts.get('bundlecaps')):
849 if proto.name == 'http':
848 if proto.name == 'http':
850 return ooberror(str(exc) + '\n')
849 return ooberror(str(exc) + '\n')
851 raise # cannot do better for bundle1 + ssh
850 raise # cannot do better for bundle1 + ssh
852 # bundle2 request expect a bundle2 reply
851 # bundle2 request expect a bundle2 reply
853 bundler = bundle2.bundle20(repo.ui)
852 bundler = bundle2.bundle20(repo.ui)
854 manargs = [('message', str(exc))]
853 manargs = [('message', str(exc))]
855 advargs = []
854 advargs = []
856 if exc.hint is not None:
855 if exc.hint is not None:
857 advargs.append(('hint', exc.hint))
856 advargs.append(('hint', exc.hint))
858 bundler.addpart(bundle2.bundlepart('error:abort',
857 bundler.addpart(bundle2.bundlepart('error:abort',
859 manargs, advargs))
858 manargs, advargs))
860 return streamres(gen=bundler.getchunks(), v1compressible=True)
859 return streamres(gen=bundler.getchunks(), v1compressible=True)
861 return streamres(gen=chunks, v1compressible=True)
860 return streamres(gen=chunks, v1compressible=True)
862
861
863 @wireprotocommand('heads')
862 @wireprotocommand('heads')
864 def heads(repo, proto):
863 def heads(repo, proto):
865 h = repo.heads()
864 h = repo.heads()
866 return encodelist(h) + "\n"
865 return encodelist(h) + "\n"
867
866
868 @wireprotocommand('hello')
867 @wireprotocommand('hello')
869 def hello(repo, proto):
868 def hello(repo, proto):
870 '''the hello command returns a set of lines describing various
869 '''the hello command returns a set of lines describing various
871 interesting things about the server, in an RFC822-like format.
870 interesting things about the server, in an RFC822-like format.
872 Currently the only one defined is "capabilities", which
871 Currently the only one defined is "capabilities", which
873 consists of a line in the form:
872 consists of a line in the form:
874
873
875 capabilities: space separated list of tokens
874 capabilities: space separated list of tokens
876 '''
875 '''
877 return "capabilities: %s\n" % (capabilities(repo, proto))
876 return "capabilities: %s\n" % (capabilities(repo, proto))
878
877
879 @wireprotocommand('listkeys', 'namespace')
878 @wireprotocommand('listkeys', 'namespace')
880 def listkeys(repo, proto, namespace):
879 def listkeys(repo, proto, namespace):
881 d = repo.listkeys(encoding.tolocal(namespace)).items()
880 d = repo.listkeys(encoding.tolocal(namespace)).items()
882 return pushkeymod.encodekeys(d)
881 return pushkeymod.encodekeys(d)
883
882
884 @wireprotocommand('lookup', 'key')
883 @wireprotocommand('lookup', 'key')
885 def lookup(repo, proto, key):
884 def lookup(repo, proto, key):
886 try:
885 try:
887 k = encoding.tolocal(key)
886 k = encoding.tolocal(key)
888 c = repo[k]
887 c = repo[k]
889 r = c.hex()
888 r = c.hex()
890 success = 1
889 success = 1
891 except Exception as inst:
890 except Exception as inst:
892 r = str(inst)
891 r = str(inst)
893 success = 0
892 success = 0
894 return "%s %s\n" % (success, r)
893 return "%s %s\n" % (success, r)
895
894
896 @wireprotocommand('known', 'nodes *')
895 @wireprotocommand('known', 'nodes *')
897 def known(repo, proto, nodes, others):
896 def known(repo, proto, nodes, others):
898 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
897 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
899
898
900 @wireprotocommand('pushkey', 'namespace key old new')
899 @wireprotocommand('pushkey', 'namespace key old new')
901 def pushkey(repo, proto, namespace, key, old, new):
900 def pushkey(repo, proto, namespace, key, old, new):
902 # compatibility with pre-1.8 clients which were accidentally
901 # compatibility with pre-1.8 clients which were accidentally
903 # sending raw binary nodes rather than utf-8-encoded hex
902 # sending raw binary nodes rather than utf-8-encoded hex
904 if len(new) == 20 and new.encode('string-escape') != new:
903 if len(new) == 20 and new.encode('string-escape') != new:
905 # looks like it could be a binary node
904 # looks like it could be a binary node
906 try:
905 try:
907 new.decode('utf-8')
906 new.decode('utf-8')
908 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
907 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
909 except UnicodeDecodeError:
908 except UnicodeDecodeError:
910 pass # binary, leave unmodified
909 pass # binary, leave unmodified
911 else:
910 else:
912 new = encoding.tolocal(new) # normal path
911 new = encoding.tolocal(new) # normal path
913
912
914 if util.safehasattr(proto, 'restore'):
913 if util.safehasattr(proto, 'restore'):
915
914
916 proto.redirect()
915 proto.redirect()
917
916
918 try:
917 try:
919 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
918 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
920 encoding.tolocal(old), new) or False
919 encoding.tolocal(old), new) or False
921 except error.Abort:
920 except error.Abort:
922 r = False
921 r = False
923
922
924 output = proto.restore()
923 output = proto.restore()
925
924
926 return '%s\n%s' % (int(r), output)
925 return '%s\n%s' % (int(r), output)
927
926
928 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
927 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
929 encoding.tolocal(old), new)
928 encoding.tolocal(old), new)
930 return '%s\n' % int(r)
929 return '%s\n' % int(r)
931
930
932 @wireprotocommand('stream_out')
931 @wireprotocommand('stream_out')
933 def stream(repo, proto):
932 def stream(repo, proto):
934 '''If the server supports streaming clone, it advertises the "stream"
933 '''If the server supports streaming clone, it advertises the "stream"
935 capability with a value representing the version and flags of the repo
934 capability with a value representing the version and flags of the repo
936 it is serving. Client checks to see if it understands the format.
935 it is serving. Client checks to see if it understands the format.
937 '''
936 '''
938 if not streamclone.allowservergeneration(repo.ui):
937 if not streamclone.allowservergeneration(repo.ui):
939 return '1\n'
938 return '1\n'
940
939
941 def getstream(it):
940 def getstream(it):
942 yield '0\n'
941 yield '0\n'
943 for chunk in it:
942 for chunk in it:
944 yield chunk
943 yield chunk
945
944
946 try:
945 try:
947 # LockError may be raised before the first result is yielded. Don't
946 # LockError may be raised before the first result is yielded. Don't
948 # emit output until we're sure we got the lock successfully.
947 # emit output until we're sure we got the lock successfully.
949 it = streamclone.generatev1wireproto(repo)
948 it = streamclone.generatev1wireproto(repo)
950 return streamres(gen=getstream(it))
949 return streamres(gen=getstream(it))
951 except error.LockError:
950 except error.LockError:
952 return '2\n'
951 return '2\n'
953
952
954 @wireprotocommand('unbundle', 'heads')
953 @wireprotocommand('unbundle', 'heads')
955 def unbundle(repo, proto, heads):
954 def unbundle(repo, proto, heads):
956 their_heads = decodelist(heads)
955 their_heads = decodelist(heads)
957
956
958 try:
957 try:
959 proto.redirect()
958 proto.redirect()
960
959
961 exchange.check_heads(repo, their_heads, 'preparing changes')
960 exchange.check_heads(repo, their_heads, 'preparing changes')
962
961
963 # write bundle data to temporary file because it can be big
962 # write bundle data to temporary file because it can be big
964 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
963 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
965 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
964 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
966 r = 0
965 r = 0
967 try:
966 try:
968 proto.getfile(fp)
967 proto.getfile(fp)
969 fp.seek(0)
968 fp.seek(0)
970 gen = exchange.readbundle(repo.ui, fp, None)
969 gen = exchange.readbundle(repo.ui, fp, None)
971 if (isinstance(gen, changegroupmod.cg1unpacker)
970 if (isinstance(gen, changegroupmod.cg1unpacker)
972 and not bundle1allowed(repo, 'push')):
971 and not bundle1allowed(repo, 'push')):
973 if proto.name == 'http':
972 if proto.name == 'http':
974 # need to special case http because stderr do not get to
973 # need to special case http because stderr do not get to
975 # the http client on failed push so we need to abuse some
974 # the http client on failed push so we need to abuse some
976 # other error type to make sure the message get to the
975 # other error type to make sure the message get to the
977 # user.
976 # user.
978 return ooberror(bundle2required)
977 return ooberror(bundle2required)
979 raise error.Abort(bundle2requiredmain,
978 raise error.Abort(bundle2requiredmain,
980 hint=bundle2requiredhint)
979 hint=bundle2requiredhint)
981
980
982 r = exchange.unbundle(repo, gen, their_heads, 'serve',
981 r = exchange.unbundle(repo, gen, their_heads, 'serve',
983 proto._client())
982 proto._client())
984 if util.safehasattr(r, 'addpart'):
983 if util.safehasattr(r, 'addpart'):
985 # The return looks streamable, we are in the bundle2 case and
984 # The return looks streamable, we are in the bundle2 case and
986 # should return a stream.
985 # should return a stream.
987 return streamres(gen=r.getchunks())
986 return streamres(gen=r.getchunks())
988 return pushres(r)
987 return pushres(r)
989
988
990 finally:
989 finally:
991 fp.close()
990 fp.close()
992 os.unlink(tempname)
991 os.unlink(tempname)
993
992
994 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
993 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
995 # handle non-bundle2 case first
994 # handle non-bundle2 case first
996 if not getattr(exc, 'duringunbundle2', False):
995 if not getattr(exc, 'duringunbundle2', False):
997 try:
996 try:
998 raise
997 raise
999 except error.Abort:
998 except error.Abort:
1000 # The old code we moved used util.stderr directly.
999 # The old code we moved used util.stderr directly.
1001 # We did not change it to minimise code change.
1000 # We did not change it to minimise code change.
1002 # This need to be moved to something proper.
1001 # This need to be moved to something proper.
1003 # Feel free to do it.
1002 # Feel free to do it.
1004 util.stderr.write("abort: %s\n" % exc)
1003 util.stderr.write("abort: %s\n" % exc)
1005 if exc.hint is not None:
1004 if exc.hint is not None:
1006 util.stderr.write("(%s)\n" % exc.hint)
1005 util.stderr.write("(%s)\n" % exc.hint)
1007 return pushres(0)
1006 return pushres(0)
1008 except error.PushRaced:
1007 except error.PushRaced:
1009 return pusherr(str(exc))
1008 return pusherr(str(exc))
1010
1009
1011 bundler = bundle2.bundle20(repo.ui)
1010 bundler = bundle2.bundle20(repo.ui)
1012 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1011 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1013 bundler.addpart(out)
1012 bundler.addpart(out)
1014 try:
1013 try:
1015 try:
1014 try:
1016 raise
1015 raise
1017 except error.PushkeyFailed as exc:
1016 except error.PushkeyFailed as exc:
1018 # check client caps
1017 # check client caps
1019 remotecaps = getattr(exc, '_replycaps', None)
1018 remotecaps = getattr(exc, '_replycaps', None)
1020 if (remotecaps is not None
1019 if (remotecaps is not None
1021 and 'pushkey' not in remotecaps.get('error', ())):
1020 and 'pushkey' not in remotecaps.get('error', ())):
1022 # no support remote side, fallback to Abort handler.
1021 # no support remote side, fallback to Abort handler.
1023 raise
1022 raise
1024 part = bundler.newpart('error:pushkey')
1023 part = bundler.newpart('error:pushkey')
1025 part.addparam('in-reply-to', exc.partid)
1024 part.addparam('in-reply-to', exc.partid)
1026 if exc.namespace is not None:
1025 if exc.namespace is not None:
1027 part.addparam('namespace', exc.namespace, mandatory=False)
1026 part.addparam('namespace', exc.namespace, mandatory=False)
1028 if exc.key is not None:
1027 if exc.key is not None:
1029 part.addparam('key', exc.key, mandatory=False)
1028 part.addparam('key', exc.key, mandatory=False)
1030 if exc.new is not None:
1029 if exc.new is not None:
1031 part.addparam('new', exc.new, mandatory=False)
1030 part.addparam('new', exc.new, mandatory=False)
1032 if exc.old is not None:
1031 if exc.old is not None:
1033 part.addparam('old', exc.old, mandatory=False)
1032 part.addparam('old', exc.old, mandatory=False)
1034 if exc.ret is not None:
1033 if exc.ret is not None:
1035 part.addparam('ret', exc.ret, mandatory=False)
1034 part.addparam('ret', exc.ret, mandatory=False)
1036 except error.BundleValueError as exc:
1035 except error.BundleValueError as exc:
1037 errpart = bundler.newpart('error:unsupportedcontent')
1036 errpart = bundler.newpart('error:unsupportedcontent')
1038 if exc.parttype is not None:
1037 if exc.parttype is not None:
1039 errpart.addparam('parttype', exc.parttype)
1038 errpart.addparam('parttype', exc.parttype)
1040 if exc.params:
1039 if exc.params:
1041 errpart.addparam('params', '\0'.join(exc.params))
1040 errpart.addparam('params', '\0'.join(exc.params))
1042 except error.Abort as exc:
1041 except error.Abort as exc:
1043 manargs = [('message', str(exc))]
1042 manargs = [('message', str(exc))]
1044 advargs = []
1043 advargs = []
1045 if exc.hint is not None:
1044 if exc.hint is not None:
1046 advargs.append(('hint', exc.hint))
1045 advargs.append(('hint', exc.hint))
1047 bundler.addpart(bundle2.bundlepart('error:abort',
1046 bundler.addpart(bundle2.bundlepart('error:abort',
1048 manargs, advargs))
1047 manargs, advargs))
1049 except error.PushRaced as exc:
1048 except error.PushRaced as exc:
1050 bundler.newpart('error:pushraced', [('message', str(exc))])
1049 bundler.newpart('error:pushraced', [('message', str(exc))])
1051 return streamres(gen=bundler.getchunks())
1050 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now