##// END OF EJS Templates
wireproto: update reference to deleted addchangegroup()...
Martin von Zweigbergk -
r32880:4c2a46f8 default
parent child Browse files
Show More
@@ -1,1062 +1,1062 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
11 import itertools
12 import os
12 import os
13 import tempfile
13 import tempfile
14
14
15 from .i18n import _
15 from .i18n import _
16 from .node import (
16 from .node import (
17 bin,
17 bin,
18 hex,
18 hex,
19 nullid,
19 nullid,
20 )
20 )
21
21
22 from . import (
22 from . import (
23 bundle2,
23 bundle2,
24 changegroup as changegroupmod,
24 changegroup as changegroupmod,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 pycompat,
30 pycompat,
31 streamclone,
31 streamclone,
32 util,
32 util,
33 )
33 )
34
34
35 urlerr = util.urlerr
35 urlerr = util.urlerr
36 urlreq = util.urlreq
36 urlreq = util.urlreq
37
37
38 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
38 bundle2requiredmain = _('incompatible Mercurial client; bundle2 required')
39 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
39 bundle2requiredhint = _('see https://www.mercurial-scm.org/wiki/'
40 'IncompatibleClient')
40 'IncompatibleClient')
41 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
41 bundle2required = '%s\n(%s)\n' % (bundle2requiredmain, bundle2requiredhint)
42
42
43 class abstractserverproto(object):
43 class abstractserverproto(object):
44 """abstract class that summarizes the protocol API
44 """abstract class that summarizes the protocol API
45
45
46 Used as reference and documentation.
46 Used as reference and documentation.
47 """
47 """
48
48
49 def getargs(self, args):
49 def getargs(self, args):
50 """return the value for arguments in <args>
50 """return the value for arguments in <args>
51
51
52 returns a list of values (same order as <args>)"""
52 returns a list of values (same order as <args>)"""
53 raise NotImplementedError()
53 raise NotImplementedError()
54
54
55 def getfile(self, fp):
55 def getfile(self, fp):
56 """write the whole content of a file into a file like object
56 """write the whole content of a file into a file like object
57
57
58 The file is in the form::
58 The file is in the form::
59
59
60 (<chunk-size>\n<chunk>)+0\n
60 (<chunk-size>\n<chunk>)+0\n
61
61
62 chunk size is the ascii version of the int.
62 chunk size is the ascii version of the int.
63 """
63 """
64 raise NotImplementedError()
64 raise NotImplementedError()
65
65
66 def redirect(self):
66 def redirect(self):
67 """may setup interception for stdout and stderr
67 """may setup interception for stdout and stderr
68
68
69 See also the `restore` method."""
69 See also the `restore` method."""
70 raise NotImplementedError()
70 raise NotImplementedError()
71
71
72 # If the `redirect` function does install interception, the `restore`
72 # If the `redirect` function does install interception, the `restore`
73 # function MUST be defined. If interception is not used, this function
73 # function MUST be defined. If interception is not used, this function
74 # MUST NOT be defined.
74 # MUST NOT be defined.
75 #
75 #
76 # left commented here on purpose
76 # left commented here on purpose
77 #
77 #
78 #def restore(self):
78 #def restore(self):
79 # """reinstall previous stdout and stderr and return intercepted stdout
79 # """reinstall previous stdout and stderr and return intercepted stdout
80 # """
80 # """
81 # raise NotImplementedError()
81 # raise NotImplementedError()
82
82
83 class remotebatch(peer.batcher):
83 class remotebatch(peer.batcher):
84 '''batches the queued calls; uses as few roundtrips as possible'''
84 '''batches the queued calls; uses as few roundtrips as possible'''
85 def __init__(self, remote):
85 def __init__(self, remote):
86 '''remote must support _submitbatch(encbatch) and
86 '''remote must support _submitbatch(encbatch) and
87 _submitone(op, encargs)'''
87 _submitone(op, encargs)'''
88 peer.batcher.__init__(self)
88 peer.batcher.__init__(self)
89 self.remote = remote
89 self.remote = remote
90 def submit(self):
90 def submit(self):
91 req, rsp = [], []
91 req, rsp = [], []
92 for name, args, opts, resref in self.calls:
92 for name, args, opts, resref in self.calls:
93 mtd = getattr(self.remote, name)
93 mtd = getattr(self.remote, name)
94 batchablefn = getattr(mtd, 'batchable', None)
94 batchablefn = getattr(mtd, 'batchable', None)
95 if batchablefn is not None:
95 if batchablefn is not None:
96 batchable = batchablefn(mtd.im_self, *args, **opts)
96 batchable = batchablefn(mtd.im_self, *args, **opts)
97 encargsorres, encresref = next(batchable)
97 encargsorres, encresref = next(batchable)
98 if encresref:
98 if encresref:
99 req.append((name, encargsorres,))
99 req.append((name, encargsorres,))
100 rsp.append((batchable, encresref, resref,))
100 rsp.append((batchable, encresref, resref,))
101 else:
101 else:
102 resref.set(encargsorres)
102 resref.set(encargsorres)
103 else:
103 else:
104 if req:
104 if req:
105 self._submitreq(req, rsp)
105 self._submitreq(req, rsp)
106 req, rsp = [], []
106 req, rsp = [], []
107 resref.set(mtd(*args, **opts))
107 resref.set(mtd(*args, **opts))
108 if req:
108 if req:
109 self._submitreq(req, rsp)
109 self._submitreq(req, rsp)
110 def _submitreq(self, req, rsp):
110 def _submitreq(self, req, rsp):
111 encresults = self.remote._submitbatch(req)
111 encresults = self.remote._submitbatch(req)
112 for encres, r in zip(encresults, rsp):
112 for encres, r in zip(encresults, rsp):
113 batchable, encresref, resref = r
113 batchable, encresref, resref = r
114 encresref.set(encres)
114 encresref.set(encres)
115 resref.set(next(batchable))
115 resref.set(next(batchable))
116
116
117 class remoteiterbatcher(peer.iterbatcher):
117 class remoteiterbatcher(peer.iterbatcher):
118 def __init__(self, remote):
118 def __init__(self, remote):
119 super(remoteiterbatcher, self).__init__()
119 super(remoteiterbatcher, self).__init__()
120 self._remote = remote
120 self._remote = remote
121
121
122 def __getattr__(self, name):
122 def __getattr__(self, name):
123 if not getattr(self._remote, name, False):
123 if not getattr(self._remote, name, False):
124 raise AttributeError(
124 raise AttributeError(
125 'Attempted to iterbatch non-batchable call to %r' % name)
125 'Attempted to iterbatch non-batchable call to %r' % name)
126 return super(remoteiterbatcher, self).__getattr__(name)
126 return super(remoteiterbatcher, self).__getattr__(name)
127
127
128 def submit(self):
128 def submit(self):
129 """Break the batch request into many patch calls and pipeline them.
129 """Break the batch request into many patch calls and pipeline them.
130
130
131 This is mostly valuable over http where request sizes can be
131 This is mostly valuable over http where request sizes can be
132 limited, but can be used in other places as well.
132 limited, but can be used in other places as well.
133 """
133 """
134 req, rsp = [], []
134 req, rsp = [], []
135 for name, args, opts, resref in self.calls:
135 for name, args, opts, resref in self.calls:
136 mtd = getattr(self._remote, name)
136 mtd = getattr(self._remote, name)
137 batchable = mtd.batchable(mtd.im_self, *args, **opts)
137 batchable = mtd.batchable(mtd.im_self, *args, **opts)
138 encargsorres, encresref = next(batchable)
138 encargsorres, encresref = next(batchable)
139 assert encresref
139 assert encresref
140 req.append((name, encargsorres))
140 req.append((name, encargsorres))
141 rsp.append((batchable, encresref))
141 rsp.append((batchable, encresref))
142 if req:
142 if req:
143 self._resultiter = self._remote._submitbatch(req)
143 self._resultiter = self._remote._submitbatch(req)
144 self._rsp = rsp
144 self._rsp = rsp
145
145
146 def results(self):
146 def results(self):
147 for (batchable, encresref), encres in itertools.izip(
147 for (batchable, encresref), encres in itertools.izip(
148 self._rsp, self._resultiter):
148 self._rsp, self._resultiter):
149 encresref.set(encres)
149 encresref.set(encres)
150 yield next(batchable)
150 yield next(batchable)
151
151
152 # Forward a couple of names from peer to make wireproto interactions
152 # Forward a couple of names from peer to make wireproto interactions
153 # slightly more sensible.
153 # slightly more sensible.
154 batchable = peer.batchable
154 batchable = peer.batchable
155 future = peer.future
155 future = peer.future
156
156
157 # list of nodes encoding / decoding
157 # list of nodes encoding / decoding
158
158
159 def decodelist(l, sep=' '):
159 def decodelist(l, sep=' '):
160 if l:
160 if l:
161 return map(bin, l.split(sep))
161 return map(bin, l.split(sep))
162 return []
162 return []
163
163
164 def encodelist(l, sep=' '):
164 def encodelist(l, sep=' '):
165 try:
165 try:
166 return sep.join(map(hex, l))
166 return sep.join(map(hex, l))
167 except TypeError:
167 except TypeError:
168 raise
168 raise
169
169
170 # batched call argument encoding
170 # batched call argument encoding
171
171
172 def escapearg(plain):
172 def escapearg(plain):
173 return (plain
173 return (plain
174 .replace(':', ':c')
174 .replace(':', ':c')
175 .replace(',', ':o')
175 .replace(',', ':o')
176 .replace(';', ':s')
176 .replace(';', ':s')
177 .replace('=', ':e'))
177 .replace('=', ':e'))
178
178
179 def unescapearg(escaped):
179 def unescapearg(escaped):
180 return (escaped
180 return (escaped
181 .replace(':e', '=')
181 .replace(':e', '=')
182 .replace(':s', ';')
182 .replace(':s', ';')
183 .replace(':o', ',')
183 .replace(':o', ',')
184 .replace(':c', ':'))
184 .replace(':c', ':'))
185
185
186 def encodebatchcmds(req):
186 def encodebatchcmds(req):
187 """Return a ``cmds`` argument value for the ``batch`` command."""
187 """Return a ``cmds`` argument value for the ``batch`` command."""
188 cmds = []
188 cmds = []
189 for op, argsdict in req:
189 for op, argsdict in req:
190 # Old servers didn't properly unescape argument names. So prevent
190 # Old servers didn't properly unescape argument names. So prevent
191 # the sending of argument names that may not be decoded properly by
191 # the sending of argument names that may not be decoded properly by
192 # servers.
192 # servers.
193 assert all(escapearg(k) == k for k in argsdict)
193 assert all(escapearg(k) == k for k in argsdict)
194
194
195 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
195 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
196 for k, v in argsdict.iteritems())
196 for k, v in argsdict.iteritems())
197 cmds.append('%s %s' % (op, args))
197 cmds.append('%s %s' % (op, args))
198
198
199 return ';'.join(cmds)
199 return ';'.join(cmds)
200
200
201 # mapping of options accepted by getbundle and their types
201 # mapping of options accepted by getbundle and their types
202 #
202 #
203 # Meant to be extended by extensions. It is extensions responsibility to ensure
203 # Meant to be extended by extensions. It is extensions responsibility to ensure
204 # such options are properly processed in exchange.getbundle.
204 # such options are properly processed in exchange.getbundle.
205 #
205 #
206 # supported types are:
206 # supported types are:
207 #
207 #
208 # :nodes: list of binary nodes
208 # :nodes: list of binary nodes
209 # :csv: list of comma-separated values
209 # :csv: list of comma-separated values
210 # :scsv: list of comma-separated values return as set
210 # :scsv: list of comma-separated values return as set
211 # :plain: string with no transformation needed.
211 # :plain: string with no transformation needed.
212 gboptsmap = {'heads': 'nodes',
212 gboptsmap = {'heads': 'nodes',
213 'common': 'nodes',
213 'common': 'nodes',
214 'obsmarkers': 'boolean',
214 'obsmarkers': 'boolean',
215 'bundlecaps': 'scsv',
215 'bundlecaps': 'scsv',
216 'listkeys': 'csv',
216 'listkeys': 'csv',
217 'cg': 'boolean',
217 'cg': 'boolean',
218 'cbattempted': 'boolean'}
218 'cbattempted': 'boolean'}
219
219
220 # client side
220 # client side
221
221
222 class wirepeer(peer.peerrepository):
222 class wirepeer(peer.peerrepository):
223 """Client-side interface for communicating with a peer repository.
223 """Client-side interface for communicating with a peer repository.
224
224
225 Methods commonly call wire protocol commands of the same name.
225 Methods commonly call wire protocol commands of the same name.
226
226
227 See also httppeer.py and sshpeer.py for protocol-specific
227 See also httppeer.py and sshpeer.py for protocol-specific
228 implementations of this interface.
228 implementations of this interface.
229 """
229 """
230 def batch(self):
230 def batch(self):
231 if self.capable('batch'):
231 if self.capable('batch'):
232 return remotebatch(self)
232 return remotebatch(self)
233 else:
233 else:
234 return peer.localbatch(self)
234 return peer.localbatch(self)
235 def _submitbatch(self, req):
235 def _submitbatch(self, req):
236 """run batch request <req> on the server
236 """run batch request <req> on the server
237
237
238 Returns an iterator of the raw responses from the server.
238 Returns an iterator of the raw responses from the server.
239 """
239 """
240 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
240 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
241 chunk = rsp.read(1024)
241 chunk = rsp.read(1024)
242 work = [chunk]
242 work = [chunk]
243 while chunk:
243 while chunk:
244 while ';' not in chunk and chunk:
244 while ';' not in chunk and chunk:
245 chunk = rsp.read(1024)
245 chunk = rsp.read(1024)
246 work.append(chunk)
246 work.append(chunk)
247 merged = ''.join(work)
247 merged = ''.join(work)
248 while ';' in merged:
248 while ';' in merged:
249 one, merged = merged.split(';', 1)
249 one, merged = merged.split(';', 1)
250 yield unescapearg(one)
250 yield unescapearg(one)
251 chunk = rsp.read(1024)
251 chunk = rsp.read(1024)
252 work = [merged, chunk]
252 work = [merged, chunk]
253 yield unescapearg(''.join(work))
253 yield unescapearg(''.join(work))
254
254
255 def _submitone(self, op, args):
255 def _submitone(self, op, args):
256 return self._call(op, **args)
256 return self._call(op, **args)
257
257
258 def iterbatch(self):
258 def iterbatch(self):
259 return remoteiterbatcher(self)
259 return remoteiterbatcher(self)
260
260
261 @batchable
261 @batchable
262 def lookup(self, key):
262 def lookup(self, key):
263 self.requirecap('lookup', _('look up remote revision'))
263 self.requirecap('lookup', _('look up remote revision'))
264 f = future()
264 f = future()
265 yield {'key': encoding.fromlocal(key)}, f
265 yield {'key': encoding.fromlocal(key)}, f
266 d = f.value
266 d = f.value
267 success, data = d[:-1].split(" ", 1)
267 success, data = d[:-1].split(" ", 1)
268 if int(success):
268 if int(success):
269 yield bin(data)
269 yield bin(data)
270 self._abort(error.RepoError(data))
270 self._abort(error.RepoError(data))
271
271
272 @batchable
272 @batchable
273 def heads(self):
273 def heads(self):
274 f = future()
274 f = future()
275 yield {}, f
275 yield {}, f
276 d = f.value
276 d = f.value
277 try:
277 try:
278 yield decodelist(d[:-1])
278 yield decodelist(d[:-1])
279 except ValueError:
279 except ValueError:
280 self._abort(error.ResponseError(_("unexpected response:"), d))
280 self._abort(error.ResponseError(_("unexpected response:"), d))
281
281
282 @batchable
282 @batchable
283 def known(self, nodes):
283 def known(self, nodes):
284 f = future()
284 f = future()
285 yield {'nodes': encodelist(nodes)}, f
285 yield {'nodes': encodelist(nodes)}, f
286 d = f.value
286 d = f.value
287 try:
287 try:
288 yield [bool(int(b)) for b in d]
288 yield [bool(int(b)) for b in d]
289 except ValueError:
289 except ValueError:
290 self._abort(error.ResponseError(_("unexpected response:"), d))
290 self._abort(error.ResponseError(_("unexpected response:"), d))
291
291
292 @batchable
292 @batchable
293 def branchmap(self):
293 def branchmap(self):
294 f = future()
294 f = future()
295 yield {}, f
295 yield {}, f
296 d = f.value
296 d = f.value
297 try:
297 try:
298 branchmap = {}
298 branchmap = {}
299 for branchpart in d.splitlines():
299 for branchpart in d.splitlines():
300 branchname, branchheads = branchpart.split(' ', 1)
300 branchname, branchheads = branchpart.split(' ', 1)
301 branchname = encoding.tolocal(urlreq.unquote(branchname))
301 branchname = encoding.tolocal(urlreq.unquote(branchname))
302 branchheads = decodelist(branchheads)
302 branchheads = decodelist(branchheads)
303 branchmap[branchname] = branchheads
303 branchmap[branchname] = branchheads
304 yield branchmap
304 yield branchmap
305 except TypeError:
305 except TypeError:
306 self._abort(error.ResponseError(_("unexpected response:"), d))
306 self._abort(error.ResponseError(_("unexpected response:"), d))
307
307
308 def branches(self, nodes):
308 def branches(self, nodes):
309 n = encodelist(nodes)
309 n = encodelist(nodes)
310 d = self._call("branches", nodes=n)
310 d = self._call("branches", nodes=n)
311 try:
311 try:
312 br = [tuple(decodelist(b)) for b in d.splitlines()]
312 br = [tuple(decodelist(b)) for b in d.splitlines()]
313 return br
313 return br
314 except ValueError:
314 except ValueError:
315 self._abort(error.ResponseError(_("unexpected response:"), d))
315 self._abort(error.ResponseError(_("unexpected response:"), d))
316
316
317 def between(self, pairs):
317 def between(self, pairs):
318 batch = 8 # avoid giant requests
318 batch = 8 # avoid giant requests
319 r = []
319 r = []
320 for i in xrange(0, len(pairs), batch):
320 for i in xrange(0, len(pairs), batch):
321 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
321 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
322 d = self._call("between", pairs=n)
322 d = self._call("between", pairs=n)
323 try:
323 try:
324 r.extend(l and decodelist(l) or [] for l in d.splitlines())
324 r.extend(l and decodelist(l) or [] for l in d.splitlines())
325 except ValueError:
325 except ValueError:
326 self._abort(error.ResponseError(_("unexpected response:"), d))
326 self._abort(error.ResponseError(_("unexpected response:"), d))
327 return r
327 return r
328
328
329 @batchable
329 @batchable
330 def pushkey(self, namespace, key, old, new):
330 def pushkey(self, namespace, key, old, new):
331 if not self.capable('pushkey'):
331 if not self.capable('pushkey'):
332 yield False, None
332 yield False, None
333 f = future()
333 f = future()
334 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
334 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
335 yield {'namespace': encoding.fromlocal(namespace),
335 yield {'namespace': encoding.fromlocal(namespace),
336 'key': encoding.fromlocal(key),
336 'key': encoding.fromlocal(key),
337 'old': encoding.fromlocal(old),
337 'old': encoding.fromlocal(old),
338 'new': encoding.fromlocal(new)}, f
338 'new': encoding.fromlocal(new)}, f
339 d = f.value
339 d = f.value
340 d, output = d.split('\n', 1)
340 d, output = d.split('\n', 1)
341 try:
341 try:
342 d = bool(int(d))
342 d = bool(int(d))
343 except ValueError:
343 except ValueError:
344 raise error.ResponseError(
344 raise error.ResponseError(
345 _('push failed (unexpected response):'), d)
345 _('push failed (unexpected response):'), d)
346 for l in output.splitlines(True):
346 for l in output.splitlines(True):
347 self.ui.status(_('remote: '), l)
347 self.ui.status(_('remote: '), l)
348 yield d
348 yield d
349
349
350 @batchable
350 @batchable
351 def listkeys(self, namespace):
351 def listkeys(self, namespace):
352 if not self.capable('pushkey'):
352 if not self.capable('pushkey'):
353 yield {}, None
353 yield {}, None
354 f = future()
354 f = future()
355 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
355 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
356 yield {'namespace': encoding.fromlocal(namespace)}, f
356 yield {'namespace': encoding.fromlocal(namespace)}, f
357 d = f.value
357 d = f.value
358 self.ui.debug('received listkey for "%s": %i bytes\n'
358 self.ui.debug('received listkey for "%s": %i bytes\n'
359 % (namespace, len(d)))
359 % (namespace, len(d)))
360 yield pushkeymod.decodekeys(d)
360 yield pushkeymod.decodekeys(d)
361
361
362 def stream_out(self):
362 def stream_out(self):
363 return self._callstream('stream_out')
363 return self._callstream('stream_out')
364
364
365 def changegroup(self, nodes, kind):
365 def changegroup(self, nodes, kind):
366 n = encodelist(nodes)
366 n = encodelist(nodes)
367 f = self._callcompressable("changegroup", roots=n)
367 f = self._callcompressable("changegroup", roots=n)
368 return changegroupmod.cg1unpacker(f, 'UN')
368 return changegroupmod.cg1unpacker(f, 'UN')
369
369
370 def changegroupsubset(self, bases, heads, kind):
370 def changegroupsubset(self, bases, heads, kind):
371 self.requirecap('changegroupsubset', _('look up remote changes'))
371 self.requirecap('changegroupsubset', _('look up remote changes'))
372 bases = encodelist(bases)
372 bases = encodelist(bases)
373 heads = encodelist(heads)
373 heads = encodelist(heads)
374 f = self._callcompressable("changegroupsubset",
374 f = self._callcompressable("changegroupsubset",
375 bases=bases, heads=heads)
375 bases=bases, heads=heads)
376 return changegroupmod.cg1unpacker(f, 'UN')
376 return changegroupmod.cg1unpacker(f, 'UN')
377
377
378 def getbundle(self, source, **kwargs):
378 def getbundle(self, source, **kwargs):
379 self.requirecap('getbundle', _('look up remote changes'))
379 self.requirecap('getbundle', _('look up remote changes'))
380 opts = {}
380 opts = {}
381 bundlecaps = kwargs.get('bundlecaps')
381 bundlecaps = kwargs.get('bundlecaps')
382 if bundlecaps is not None:
382 if bundlecaps is not None:
383 kwargs['bundlecaps'] = sorted(bundlecaps)
383 kwargs['bundlecaps'] = sorted(bundlecaps)
384 else:
384 else:
385 bundlecaps = () # kwargs could have it to None
385 bundlecaps = () # kwargs could have it to None
386 for key, value in kwargs.iteritems():
386 for key, value in kwargs.iteritems():
387 if value is None:
387 if value is None:
388 continue
388 continue
389 keytype = gboptsmap.get(key)
389 keytype = gboptsmap.get(key)
390 if keytype is None:
390 if keytype is None:
391 assert False, 'unexpected'
391 assert False, 'unexpected'
392 elif keytype == 'nodes':
392 elif keytype == 'nodes':
393 value = encodelist(value)
393 value = encodelist(value)
394 elif keytype in ('csv', 'scsv'):
394 elif keytype in ('csv', 'scsv'):
395 value = ','.join(value)
395 value = ','.join(value)
396 elif keytype == 'boolean':
396 elif keytype == 'boolean':
397 value = '%i' % bool(value)
397 value = '%i' % bool(value)
398 elif keytype != 'plain':
398 elif keytype != 'plain':
399 raise KeyError('unknown getbundle option type %s'
399 raise KeyError('unknown getbundle option type %s'
400 % keytype)
400 % keytype)
401 opts[key] = value
401 opts[key] = value
402 f = self._callcompressable("getbundle", **opts)
402 f = self._callcompressable("getbundle", **opts)
403 if any((cap.startswith('HG2') for cap in bundlecaps)):
403 if any((cap.startswith('HG2') for cap in bundlecaps)):
404 return bundle2.getunbundler(self.ui, f)
404 return bundle2.getunbundler(self.ui, f)
405 else:
405 else:
406 return changegroupmod.cg1unpacker(f, 'UN')
406 return changegroupmod.cg1unpacker(f, 'UN')
407
407
408 def unbundle(self, cg, heads, url):
408 def unbundle(self, cg, heads, url):
409 '''Send cg (a readable file-like object representing the
409 '''Send cg (a readable file-like object representing the
410 changegroup to push, typically a chunkbuffer object) to the
410 changegroup to push, typically a chunkbuffer object) to the
411 remote server as a bundle.
411 remote server as a bundle.
412
412
413 When pushing a bundle10 stream, return an integer indicating the
413 When pushing a bundle10 stream, return an integer indicating the
414 result of the push (see localrepository.addchangegroup()).
414 result of the push (see changegroup.apply()).
415
415
416 When pushing a bundle20 stream, return a bundle20 stream.
416 When pushing a bundle20 stream, return a bundle20 stream.
417
417
418 `url` is the url the client thinks it's pushing to, which is
418 `url` is the url the client thinks it's pushing to, which is
419 visible to hooks.
419 visible to hooks.
420 '''
420 '''
421
421
422 if heads != ['force'] and self.capable('unbundlehash'):
422 if heads != ['force'] and self.capable('unbundlehash'):
423 heads = encodelist(['hashed',
423 heads = encodelist(['hashed',
424 hashlib.sha1(''.join(sorted(heads))).digest()])
424 hashlib.sha1(''.join(sorted(heads))).digest()])
425 else:
425 else:
426 heads = encodelist(heads)
426 heads = encodelist(heads)
427
427
428 if util.safehasattr(cg, 'deltaheader'):
428 if util.safehasattr(cg, 'deltaheader'):
429 # this a bundle10, do the old style call sequence
429 # this a bundle10, do the old style call sequence
430 ret, output = self._callpush("unbundle", cg, heads=heads)
430 ret, output = self._callpush("unbundle", cg, heads=heads)
431 if ret == "":
431 if ret == "":
432 raise error.ResponseError(
432 raise error.ResponseError(
433 _('push failed:'), output)
433 _('push failed:'), output)
434 try:
434 try:
435 ret = int(ret)
435 ret = int(ret)
436 except ValueError:
436 except ValueError:
437 raise error.ResponseError(
437 raise error.ResponseError(
438 _('push failed (unexpected response):'), ret)
438 _('push failed (unexpected response):'), ret)
439
439
440 for l in output.splitlines(True):
440 for l in output.splitlines(True):
441 self.ui.status(_('remote: '), l)
441 self.ui.status(_('remote: '), l)
442 else:
442 else:
443 # bundle2 push. Send a stream, fetch a stream.
443 # bundle2 push. Send a stream, fetch a stream.
444 stream = self._calltwowaystream('unbundle', cg, heads=heads)
444 stream = self._calltwowaystream('unbundle', cg, heads=heads)
445 ret = bundle2.getunbundler(self.ui, stream)
445 ret = bundle2.getunbundler(self.ui, stream)
446 return ret
446 return ret
447
447
448 def debugwireargs(self, one, two, three=None, four=None, five=None):
448 def debugwireargs(self, one, two, three=None, four=None, five=None):
449 # don't pass optional arguments left at their default value
449 # don't pass optional arguments left at their default value
450 opts = {}
450 opts = {}
451 if three is not None:
451 if three is not None:
452 opts['three'] = three
452 opts['three'] = three
453 if four is not None:
453 if four is not None:
454 opts['four'] = four
454 opts['four'] = four
455 return self._call('debugwireargs', one=one, two=two, **opts)
455 return self._call('debugwireargs', one=one, two=two, **opts)
456
456
457 def _call(self, cmd, **args):
457 def _call(self, cmd, **args):
458 """execute <cmd> on the server
458 """execute <cmd> on the server
459
459
460 The command is expected to return a simple string.
460 The command is expected to return a simple string.
461
461
462 returns the server reply as a string."""
462 returns the server reply as a string."""
463 raise NotImplementedError()
463 raise NotImplementedError()
464
464
465 def _callstream(self, cmd, **args):
465 def _callstream(self, cmd, **args):
466 """execute <cmd> on the server
466 """execute <cmd> on the server
467
467
468 The command is expected to return a stream. Note that if the
468 The command is expected to return a stream. Note that if the
469 command doesn't return a stream, _callstream behaves
469 command doesn't return a stream, _callstream behaves
470 differently for ssh and http peers.
470 differently for ssh and http peers.
471
471
472 returns the server reply as a file like object.
472 returns the server reply as a file like object.
473 """
473 """
474 raise NotImplementedError()
474 raise NotImplementedError()
475
475
476 def _callcompressable(self, cmd, **args):
476 def _callcompressable(self, cmd, **args):
477 """execute <cmd> on the server
477 """execute <cmd> on the server
478
478
479 The command is expected to return a stream.
479 The command is expected to return a stream.
480
480
481 The stream may have been compressed in some implementations. This
481 The stream may have been compressed in some implementations. This
482 function takes care of the decompression. This is the only difference
482 function takes care of the decompression. This is the only difference
483 with _callstream.
483 with _callstream.
484
484
485 returns the server reply as a file like object.
485 returns the server reply as a file like object.
486 """
486 """
487 raise NotImplementedError()
487 raise NotImplementedError()
488
488
489 def _callpush(self, cmd, fp, **args):
489 def _callpush(self, cmd, fp, **args):
490 """execute a <cmd> on server
490 """execute a <cmd> on server
491
491
492 The command is expected to be related to a push. Push has a special
492 The command is expected to be related to a push. Push has a special
493 return method.
493 return method.
494
494
495 returns the server reply as a (ret, output) tuple. ret is either
495 returns the server reply as a (ret, output) tuple. ret is either
496 empty (error) or a stringified int.
496 empty (error) or a stringified int.
497 """
497 """
498 raise NotImplementedError()
498 raise NotImplementedError()
499
499
500 def _calltwowaystream(self, cmd, fp, **args):
500 def _calltwowaystream(self, cmd, fp, **args):
501 """execute <cmd> on server
501 """execute <cmd> on server
502
502
503 The command will send a stream to the server and get a stream in reply.
503 The command will send a stream to the server and get a stream in reply.
504 """
504 """
505 raise NotImplementedError()
505 raise NotImplementedError()
506
506
507 def _abort(self, exception):
507 def _abort(self, exception):
508 """clearly abort the wire protocol connection and raise the exception
508 """clearly abort the wire protocol connection and raise the exception
509 """
509 """
510 raise NotImplementedError()
510 raise NotImplementedError()
511
511
512 # server side
512 # server side
513
513
514 # wire protocol command can either return a string or one of these classes.
514 # wire protocol command can either return a string or one of these classes.
515 class streamres(object):
515 class streamres(object):
516 """wireproto reply: binary stream
516 """wireproto reply: binary stream
517
517
518 The call was successful and the result is a stream.
518 The call was successful and the result is a stream.
519
519
520 Accepts either a generator or an object with a ``read(size)`` method.
520 Accepts either a generator or an object with a ``read(size)`` method.
521
521
522 ``v1compressible`` indicates whether this data can be compressed to
522 ``v1compressible`` indicates whether this data can be compressed to
523 "version 1" clients (technically: HTTP peers using
523 "version 1" clients (technically: HTTP peers using
524 application/mercurial-0.1 media type). This flag should NOT be used on
524 application/mercurial-0.1 media type). This flag should NOT be used on
525 new commands because new clients should support a more modern compression
525 new commands because new clients should support a more modern compression
526 mechanism.
526 mechanism.
527 """
527 """
528 def __init__(self, gen=None, reader=None, v1compressible=False):
528 def __init__(self, gen=None, reader=None, v1compressible=False):
529 self.gen = gen
529 self.gen = gen
530 self.reader = reader
530 self.reader = reader
531 self.v1compressible = v1compressible
531 self.v1compressible = v1compressible
532
532
533 class pushres(object):
533 class pushres(object):
534 """wireproto reply: success with simple integer return
534 """wireproto reply: success with simple integer return
535
535
536 The call was successful and returned an integer contained in `self.res`.
536 The call was successful and returned an integer contained in `self.res`.
537 """
537 """
538 def __init__(self, res):
538 def __init__(self, res):
539 self.res = res
539 self.res = res
540
540
541 class pusherr(object):
541 class pusherr(object):
542 """wireproto reply: failure
542 """wireproto reply: failure
543
543
544 The call failed. The `self.res` attribute contains the error message.
544 The call failed. The `self.res` attribute contains the error message.
545 """
545 """
546 def __init__(self, res):
546 def __init__(self, res):
547 self.res = res
547 self.res = res
548
548
549 class ooberror(object):
549 class ooberror(object):
550 """wireproto reply: failure of a batch of operation
550 """wireproto reply: failure of a batch of operation
551
551
552 Something failed during a batch call. The error message is stored in
552 Something failed during a batch call. The error message is stored in
553 `self.message`.
553 `self.message`.
554 """
554 """
555 def __init__(self, message):
555 def __init__(self, message):
556 self.message = message
556 self.message = message
557
557
558 def getdispatchrepo(repo, proto, command):
558 def getdispatchrepo(repo, proto, command):
559 """Obtain the repo used for processing wire protocol commands.
559 """Obtain the repo used for processing wire protocol commands.
560
560
561 The intent of this function is to serve as a monkeypatch point for
561 The intent of this function is to serve as a monkeypatch point for
562 extensions that need commands to operate on different repo views under
562 extensions that need commands to operate on different repo views under
563 specialized circumstances.
563 specialized circumstances.
564 """
564 """
565 return repo.filtered('served')
565 return repo.filtered('served')
566
566
567 def dispatch(repo, proto, command):
567 def dispatch(repo, proto, command):
568 repo = getdispatchrepo(repo, proto, command)
568 repo = getdispatchrepo(repo, proto, command)
569 func, spec = commands[command]
569 func, spec = commands[command]
570 args = proto.getargs(spec)
570 args = proto.getargs(spec)
571 return func(repo, proto, *args)
571 return func(repo, proto, *args)
572
572
573 def options(cmd, keys, others):
573 def options(cmd, keys, others):
574 opts = {}
574 opts = {}
575 for k in keys:
575 for k in keys:
576 if k in others:
576 if k in others:
577 opts[k] = others[k]
577 opts[k] = others[k]
578 del others[k]
578 del others[k]
579 if others:
579 if others:
580 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
580 util.stderr.write("warning: %s ignored unexpected arguments %s\n"
581 % (cmd, ",".join(others)))
581 % (cmd, ",".join(others)))
582 return opts
582 return opts
583
583
584 def bundle1allowed(repo, action):
584 def bundle1allowed(repo, action):
585 """Whether a bundle1 operation is allowed from the server.
585 """Whether a bundle1 operation is allowed from the server.
586
586
587 Priority is:
587 Priority is:
588
588
589 1. server.bundle1gd.<action> (if generaldelta active)
589 1. server.bundle1gd.<action> (if generaldelta active)
590 2. server.bundle1.<action>
590 2. server.bundle1.<action>
591 3. server.bundle1gd (if generaldelta active)
591 3. server.bundle1gd (if generaldelta active)
592 4. server.bundle1
592 4. server.bundle1
593 """
593 """
594 ui = repo.ui
594 ui = repo.ui
595 gd = 'generaldelta' in repo.requirements
595 gd = 'generaldelta' in repo.requirements
596
596
597 if gd:
597 if gd:
598 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
598 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
599 if v is not None:
599 if v is not None:
600 return v
600 return v
601
601
602 v = ui.configbool('server', 'bundle1.%s' % action, None)
602 v = ui.configbool('server', 'bundle1.%s' % action, None)
603 if v is not None:
603 if v is not None:
604 return v
604 return v
605
605
606 if gd:
606 if gd:
607 v = ui.configbool('server', 'bundle1gd', None)
607 v = ui.configbool('server', 'bundle1gd', None)
608 if v is not None:
608 if v is not None:
609 return v
609 return v
610
610
611 return ui.configbool('server', 'bundle1', True)
611 return ui.configbool('server', 'bundle1', True)
612
612
613 def supportedcompengines(ui, proto, role):
613 def supportedcompengines(ui, proto, role):
614 """Obtain the list of supported compression engines for a request."""
614 """Obtain the list of supported compression engines for a request."""
615 assert role in (util.CLIENTROLE, util.SERVERROLE)
615 assert role in (util.CLIENTROLE, util.SERVERROLE)
616
616
617 compengines = util.compengines.supportedwireengines(role)
617 compengines = util.compengines.supportedwireengines(role)
618
618
619 # Allow config to override default list and ordering.
619 # Allow config to override default list and ordering.
620 if role == util.SERVERROLE:
620 if role == util.SERVERROLE:
621 configengines = ui.configlist('server', 'compressionengines')
621 configengines = ui.configlist('server', 'compressionengines')
622 config = 'server.compressionengines'
622 config = 'server.compressionengines'
623 else:
623 else:
624 # This is currently implemented mainly to facilitate testing. In most
624 # This is currently implemented mainly to facilitate testing. In most
625 # cases, the server should be in charge of choosing a compression engine
625 # cases, the server should be in charge of choosing a compression engine
626 # because a server has the most to lose from a sub-optimal choice. (e.g.
626 # because a server has the most to lose from a sub-optimal choice. (e.g.
627 # CPU DoS due to an expensive engine or a network DoS due to poor
627 # CPU DoS due to an expensive engine or a network DoS due to poor
628 # compression ratio).
628 # compression ratio).
629 configengines = ui.configlist('experimental',
629 configengines = ui.configlist('experimental',
630 'clientcompressionengines')
630 'clientcompressionengines')
631 config = 'experimental.clientcompressionengines'
631 config = 'experimental.clientcompressionengines'
632
632
633 # No explicit config. Filter out the ones that aren't supposed to be
633 # No explicit config. Filter out the ones that aren't supposed to be
634 # advertised and return default ordering.
634 # advertised and return default ordering.
635 if not configengines:
635 if not configengines:
636 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
636 attr = 'serverpriority' if role == util.SERVERROLE else 'clientpriority'
637 return [e for e in compengines
637 return [e for e in compengines
638 if getattr(e.wireprotosupport(), attr) > 0]
638 if getattr(e.wireprotosupport(), attr) > 0]
639
639
640 # If compression engines are listed in the config, assume there is a good
640 # If compression engines are listed in the config, assume there is a good
641 # reason for it (like server operators wanting to achieve specific
641 # reason for it (like server operators wanting to achieve specific
642 # performance characteristics). So fail fast if the config references
642 # performance characteristics). So fail fast if the config references
643 # unusable compression engines.
643 # unusable compression engines.
644 validnames = set(e.name() for e in compengines)
644 validnames = set(e.name() for e in compengines)
645 invalidnames = set(e for e in configengines if e not in validnames)
645 invalidnames = set(e for e in configengines if e not in validnames)
646 if invalidnames:
646 if invalidnames:
647 raise error.Abort(_('invalid compression engine defined in %s: %s') %
647 raise error.Abort(_('invalid compression engine defined in %s: %s') %
648 (config, ', '.join(sorted(invalidnames))))
648 (config, ', '.join(sorted(invalidnames))))
649
649
650 compengines = [e for e in compengines if e.name() in configengines]
650 compengines = [e for e in compengines if e.name() in configengines]
651 compengines = sorted(compengines,
651 compengines = sorted(compengines,
652 key=lambda e: configengines.index(e.name()))
652 key=lambda e: configengines.index(e.name()))
653
653
654 if not compengines:
654 if not compengines:
655 raise error.Abort(_('%s config option does not specify any known '
655 raise error.Abort(_('%s config option does not specify any known '
656 'compression engines') % config,
656 'compression engines') % config,
657 hint=_('usable compression engines: %s') %
657 hint=_('usable compression engines: %s') %
658 ', '.sorted(validnames))
658 ', '.sorted(validnames))
659
659
660 return compengines
660 return compengines
661
661
662 # list of commands
662 # list of commands
663 commands = {}
663 commands = {}
664
664
665 def wireprotocommand(name, args=''):
665 def wireprotocommand(name, args=''):
666 """decorator for wire protocol command"""
666 """decorator for wire protocol command"""
667 def register(func):
667 def register(func):
668 commands[name] = (func, args)
668 commands[name] = (func, args)
669 return func
669 return func
670 return register
670 return register
671
671
672 @wireprotocommand('batch', 'cmds *')
672 @wireprotocommand('batch', 'cmds *')
673 def batch(repo, proto, cmds, others):
673 def batch(repo, proto, cmds, others):
674 repo = repo.filtered("served")
674 repo = repo.filtered("served")
675 res = []
675 res = []
676 for pair in cmds.split(';'):
676 for pair in cmds.split(';'):
677 op, args = pair.split(' ', 1)
677 op, args = pair.split(' ', 1)
678 vals = {}
678 vals = {}
679 for a in args.split(','):
679 for a in args.split(','):
680 if a:
680 if a:
681 n, v = a.split('=')
681 n, v = a.split('=')
682 vals[unescapearg(n)] = unescapearg(v)
682 vals[unescapearg(n)] = unescapearg(v)
683 func, spec = commands[op]
683 func, spec = commands[op]
684 if spec:
684 if spec:
685 keys = spec.split()
685 keys = spec.split()
686 data = {}
686 data = {}
687 for k in keys:
687 for k in keys:
688 if k == '*':
688 if k == '*':
689 star = {}
689 star = {}
690 for key in vals.keys():
690 for key in vals.keys():
691 if key not in keys:
691 if key not in keys:
692 star[key] = vals[key]
692 star[key] = vals[key]
693 data['*'] = star
693 data['*'] = star
694 else:
694 else:
695 data[k] = vals[k]
695 data[k] = vals[k]
696 result = func(repo, proto, *[data[k] for k in keys])
696 result = func(repo, proto, *[data[k] for k in keys])
697 else:
697 else:
698 result = func(repo, proto)
698 result = func(repo, proto)
699 if isinstance(result, ooberror):
699 if isinstance(result, ooberror):
700 return result
700 return result
701 res.append(escapearg(result))
701 res.append(escapearg(result))
702 return ';'.join(res)
702 return ';'.join(res)
703
703
704 @wireprotocommand('between', 'pairs')
704 @wireprotocommand('between', 'pairs')
705 def between(repo, proto, pairs):
705 def between(repo, proto, pairs):
706 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
706 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
707 r = []
707 r = []
708 for b in repo.between(pairs):
708 for b in repo.between(pairs):
709 r.append(encodelist(b) + "\n")
709 r.append(encodelist(b) + "\n")
710 return "".join(r)
710 return "".join(r)
711
711
712 @wireprotocommand('branchmap')
712 @wireprotocommand('branchmap')
713 def branchmap(repo, proto):
713 def branchmap(repo, proto):
714 branchmap = repo.branchmap()
714 branchmap = repo.branchmap()
715 heads = []
715 heads = []
716 for branch, nodes in branchmap.iteritems():
716 for branch, nodes in branchmap.iteritems():
717 branchname = urlreq.quote(encoding.fromlocal(branch))
717 branchname = urlreq.quote(encoding.fromlocal(branch))
718 branchnodes = encodelist(nodes)
718 branchnodes = encodelist(nodes)
719 heads.append('%s %s' % (branchname, branchnodes))
719 heads.append('%s %s' % (branchname, branchnodes))
720 return '\n'.join(heads)
720 return '\n'.join(heads)
721
721
722 @wireprotocommand('branches', 'nodes')
722 @wireprotocommand('branches', 'nodes')
723 def branches(repo, proto, nodes):
723 def branches(repo, proto, nodes):
724 nodes = decodelist(nodes)
724 nodes = decodelist(nodes)
725 r = []
725 r = []
726 for b in repo.branches(nodes):
726 for b in repo.branches(nodes):
727 r.append(encodelist(b) + "\n")
727 r.append(encodelist(b) + "\n")
728 return "".join(r)
728 return "".join(r)
729
729
730 @wireprotocommand('clonebundles', '')
730 @wireprotocommand('clonebundles', '')
731 def clonebundles(repo, proto):
731 def clonebundles(repo, proto):
732 """Server command for returning info for available bundles to seed clones.
732 """Server command for returning info for available bundles to seed clones.
733
733
734 Clients will parse this response and determine what bundle to fetch.
734 Clients will parse this response and determine what bundle to fetch.
735
735
736 Extensions may wrap this command to filter or dynamically emit data
736 Extensions may wrap this command to filter or dynamically emit data
737 depending on the request. e.g. you could advertise URLs for the closest
737 depending on the request. e.g. you could advertise URLs for the closest
738 data center given the client's IP address.
738 data center given the client's IP address.
739 """
739 """
740 return repo.vfs.tryread('clonebundles.manifest')
740 return repo.vfs.tryread('clonebundles.manifest')
741
741
742 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
742 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
743 'known', 'getbundle', 'unbundlehash', 'batch']
743 'known', 'getbundle', 'unbundlehash', 'batch']
744
744
745 def _capabilities(repo, proto):
745 def _capabilities(repo, proto):
746 """return a list of capabilities for a repo
746 """return a list of capabilities for a repo
747
747
748 This function exists to allow extensions to easily wrap capabilities
748 This function exists to allow extensions to easily wrap capabilities
749 computation
749 computation
750
750
751 - returns a lists: easy to alter
751 - returns a lists: easy to alter
752 - change done here will be propagated to both `capabilities` and `hello`
752 - change done here will be propagated to both `capabilities` and `hello`
753 command without any other action needed.
753 command without any other action needed.
754 """
754 """
755 # copy to prevent modification of the global list
755 # copy to prevent modification of the global list
756 caps = list(wireprotocaps)
756 caps = list(wireprotocaps)
757 if streamclone.allowservergeneration(repo):
757 if streamclone.allowservergeneration(repo):
758 if repo.ui.configbool('server', 'preferuncompressed', False):
758 if repo.ui.configbool('server', 'preferuncompressed', False):
759 caps.append('stream-preferred')
759 caps.append('stream-preferred')
760 requiredformats = repo.requirements & repo.supportedformats
760 requiredformats = repo.requirements & repo.supportedformats
761 # if our local revlogs are just revlogv1, add 'stream' cap
761 # if our local revlogs are just revlogv1, add 'stream' cap
762 if not requiredformats - {'revlogv1'}:
762 if not requiredformats - {'revlogv1'}:
763 caps.append('stream')
763 caps.append('stream')
764 # otherwise, add 'streamreqs' detailing our local revlog format
764 # otherwise, add 'streamreqs' detailing our local revlog format
765 else:
765 else:
766 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
766 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
767 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
767 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
768 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
768 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
769 caps.append('bundle2=' + urlreq.quote(capsblob))
769 caps.append('bundle2=' + urlreq.quote(capsblob))
770 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
770 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
771
771
772 if proto.name == 'http':
772 if proto.name == 'http':
773 caps.append('httpheader=%d' %
773 caps.append('httpheader=%d' %
774 repo.ui.configint('server', 'maxhttpheaderlen', 1024))
774 repo.ui.configint('server', 'maxhttpheaderlen', 1024))
775 if repo.ui.configbool('experimental', 'httppostargs', False):
775 if repo.ui.configbool('experimental', 'httppostargs', False):
776 caps.append('httppostargs')
776 caps.append('httppostargs')
777
777
778 # FUTURE advertise 0.2rx once support is implemented
778 # FUTURE advertise 0.2rx once support is implemented
779 # FUTURE advertise minrx and mintx after consulting config option
779 # FUTURE advertise minrx and mintx after consulting config option
780 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
780 caps.append('httpmediatype=0.1rx,0.1tx,0.2tx')
781
781
782 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
782 compengines = supportedcompengines(repo.ui, proto, util.SERVERROLE)
783 if compengines:
783 if compengines:
784 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
784 comptypes = ','.join(urlreq.quote(e.wireprotosupport().name)
785 for e in compengines)
785 for e in compengines)
786 caps.append('compression=%s' % comptypes)
786 caps.append('compression=%s' % comptypes)
787
787
788 return caps
788 return caps
789
789
790 # If you are writing an extension and consider wrapping this function. Wrap
790 # If you are writing an extension and consider wrapping this function. Wrap
791 # `_capabilities` instead.
791 # `_capabilities` instead.
792 @wireprotocommand('capabilities')
792 @wireprotocommand('capabilities')
793 def capabilities(repo, proto):
793 def capabilities(repo, proto):
794 return ' '.join(_capabilities(repo, proto))
794 return ' '.join(_capabilities(repo, proto))
795
795
796 @wireprotocommand('changegroup', 'roots')
796 @wireprotocommand('changegroup', 'roots')
797 def changegroup(repo, proto, roots):
797 def changegroup(repo, proto, roots):
798 nodes = decodelist(roots)
798 nodes = decodelist(roots)
799 cg = changegroupmod.changegroup(repo, nodes, 'serve')
799 cg = changegroupmod.changegroup(repo, nodes, 'serve')
800 return streamres(reader=cg, v1compressible=True)
800 return streamres(reader=cg, v1compressible=True)
801
801
802 @wireprotocommand('changegroupsubset', 'bases heads')
802 @wireprotocommand('changegroupsubset', 'bases heads')
803 def changegroupsubset(repo, proto, bases, heads):
803 def changegroupsubset(repo, proto, bases, heads):
804 bases = decodelist(bases)
804 bases = decodelist(bases)
805 heads = decodelist(heads)
805 heads = decodelist(heads)
806 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
806 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
807 return streamres(reader=cg, v1compressible=True)
807 return streamres(reader=cg, v1compressible=True)
808
808
809 @wireprotocommand('debugwireargs', 'one two *')
809 @wireprotocommand('debugwireargs', 'one two *')
810 def debugwireargs(repo, proto, one, two, others):
810 def debugwireargs(repo, proto, one, two, others):
811 # only accept optional args from the known set
811 # only accept optional args from the known set
812 opts = options('debugwireargs', ['three', 'four'], others)
812 opts = options('debugwireargs', ['three', 'four'], others)
813 return repo.debugwireargs(one, two, **opts)
813 return repo.debugwireargs(one, two, **opts)
814
814
815 @wireprotocommand('getbundle', '*')
815 @wireprotocommand('getbundle', '*')
816 def getbundle(repo, proto, others):
816 def getbundle(repo, proto, others):
817 opts = options('getbundle', gboptsmap.keys(), others)
817 opts = options('getbundle', gboptsmap.keys(), others)
818 for k, v in opts.iteritems():
818 for k, v in opts.iteritems():
819 keytype = gboptsmap[k]
819 keytype = gboptsmap[k]
820 if keytype == 'nodes':
820 if keytype == 'nodes':
821 opts[k] = decodelist(v)
821 opts[k] = decodelist(v)
822 elif keytype == 'csv':
822 elif keytype == 'csv':
823 opts[k] = list(v.split(','))
823 opts[k] = list(v.split(','))
824 elif keytype == 'scsv':
824 elif keytype == 'scsv':
825 opts[k] = set(v.split(','))
825 opts[k] = set(v.split(','))
826 elif keytype == 'boolean':
826 elif keytype == 'boolean':
827 # Client should serialize False as '0', which is a non-empty string
827 # Client should serialize False as '0', which is a non-empty string
828 # so it evaluates as a True bool.
828 # so it evaluates as a True bool.
829 if v == '0':
829 if v == '0':
830 opts[k] = False
830 opts[k] = False
831 else:
831 else:
832 opts[k] = bool(v)
832 opts[k] = bool(v)
833 elif keytype != 'plain':
833 elif keytype != 'plain':
834 raise KeyError('unknown getbundle option type %s'
834 raise KeyError('unknown getbundle option type %s'
835 % keytype)
835 % keytype)
836
836
837 if not bundle1allowed(repo, 'pull'):
837 if not bundle1allowed(repo, 'pull'):
838 if not exchange.bundle2requested(opts.get('bundlecaps')):
838 if not exchange.bundle2requested(opts.get('bundlecaps')):
839 if proto.name == 'http':
839 if proto.name == 'http':
840 return ooberror(bundle2required)
840 return ooberror(bundle2required)
841 raise error.Abort(bundle2requiredmain,
841 raise error.Abort(bundle2requiredmain,
842 hint=bundle2requiredhint)
842 hint=bundle2requiredhint)
843
843
844 try:
844 try:
845 if repo.ui.configbool('server', 'disablefullbundle', False):
845 if repo.ui.configbool('server', 'disablefullbundle', False):
846 # Check to see if this is a full clone.
846 # Check to see if this is a full clone.
847 clheads = set(repo.changelog.heads())
847 clheads = set(repo.changelog.heads())
848 heads = set(opts.get('heads', set()))
848 heads = set(opts.get('heads', set()))
849 common = set(opts.get('common', set()))
849 common = set(opts.get('common', set()))
850 common.discard(nullid)
850 common.discard(nullid)
851 if not common and clheads == heads:
851 if not common and clheads == heads:
852 raise error.Abort(
852 raise error.Abort(
853 _('server has pull-based clones disabled'),
853 _('server has pull-based clones disabled'),
854 hint=_('remove --pull if specified or upgrade Mercurial'))
854 hint=_('remove --pull if specified or upgrade Mercurial'))
855
855
856 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
856 chunks = exchange.getbundlechunks(repo, 'serve', **opts)
857 except error.Abort as exc:
857 except error.Abort as exc:
858 # cleanly forward Abort error to the client
858 # cleanly forward Abort error to the client
859 if not exchange.bundle2requested(opts.get('bundlecaps')):
859 if not exchange.bundle2requested(opts.get('bundlecaps')):
860 if proto.name == 'http':
860 if proto.name == 'http':
861 return ooberror(str(exc) + '\n')
861 return ooberror(str(exc) + '\n')
862 raise # cannot do better for bundle1 + ssh
862 raise # cannot do better for bundle1 + ssh
863 # bundle2 request expect a bundle2 reply
863 # bundle2 request expect a bundle2 reply
864 bundler = bundle2.bundle20(repo.ui)
864 bundler = bundle2.bundle20(repo.ui)
865 manargs = [('message', str(exc))]
865 manargs = [('message', str(exc))]
866 advargs = []
866 advargs = []
867 if exc.hint is not None:
867 if exc.hint is not None:
868 advargs.append(('hint', exc.hint))
868 advargs.append(('hint', exc.hint))
869 bundler.addpart(bundle2.bundlepart('error:abort',
869 bundler.addpart(bundle2.bundlepart('error:abort',
870 manargs, advargs))
870 manargs, advargs))
871 return streamres(gen=bundler.getchunks(), v1compressible=True)
871 return streamres(gen=bundler.getchunks(), v1compressible=True)
872 return streamres(gen=chunks, v1compressible=True)
872 return streamres(gen=chunks, v1compressible=True)
873
873
874 @wireprotocommand('heads')
874 @wireprotocommand('heads')
875 def heads(repo, proto):
875 def heads(repo, proto):
876 h = repo.heads()
876 h = repo.heads()
877 return encodelist(h) + "\n"
877 return encodelist(h) + "\n"
878
878
879 @wireprotocommand('hello')
879 @wireprotocommand('hello')
880 def hello(repo, proto):
880 def hello(repo, proto):
881 '''the hello command returns a set of lines describing various
881 '''the hello command returns a set of lines describing various
882 interesting things about the server, in an RFC822-like format.
882 interesting things about the server, in an RFC822-like format.
883 Currently the only one defined is "capabilities", which
883 Currently the only one defined is "capabilities", which
884 consists of a line in the form:
884 consists of a line in the form:
885
885
886 capabilities: space separated list of tokens
886 capabilities: space separated list of tokens
887 '''
887 '''
888 return "capabilities: %s\n" % (capabilities(repo, proto))
888 return "capabilities: %s\n" % (capabilities(repo, proto))
889
889
890 @wireprotocommand('listkeys', 'namespace')
890 @wireprotocommand('listkeys', 'namespace')
891 def listkeys(repo, proto, namespace):
891 def listkeys(repo, proto, namespace):
892 d = repo.listkeys(encoding.tolocal(namespace)).items()
892 d = repo.listkeys(encoding.tolocal(namespace)).items()
893 return pushkeymod.encodekeys(d)
893 return pushkeymod.encodekeys(d)
894
894
895 @wireprotocommand('lookup', 'key')
895 @wireprotocommand('lookup', 'key')
896 def lookup(repo, proto, key):
896 def lookup(repo, proto, key):
897 try:
897 try:
898 k = encoding.tolocal(key)
898 k = encoding.tolocal(key)
899 c = repo[k]
899 c = repo[k]
900 r = c.hex()
900 r = c.hex()
901 success = 1
901 success = 1
902 except Exception as inst:
902 except Exception as inst:
903 r = str(inst)
903 r = str(inst)
904 success = 0
904 success = 0
905 return "%s %s\n" % (success, r)
905 return "%s %s\n" % (success, r)
906
906
907 @wireprotocommand('known', 'nodes *')
907 @wireprotocommand('known', 'nodes *')
908 def known(repo, proto, nodes, others):
908 def known(repo, proto, nodes, others):
909 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
909 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
910
910
911 @wireprotocommand('pushkey', 'namespace key old new')
911 @wireprotocommand('pushkey', 'namespace key old new')
912 def pushkey(repo, proto, namespace, key, old, new):
912 def pushkey(repo, proto, namespace, key, old, new):
913 # compatibility with pre-1.8 clients which were accidentally
913 # compatibility with pre-1.8 clients which were accidentally
914 # sending raw binary nodes rather than utf-8-encoded hex
914 # sending raw binary nodes rather than utf-8-encoded hex
915 if len(new) == 20 and util.escapestr(new) != new:
915 if len(new) == 20 and util.escapestr(new) != new:
916 # looks like it could be a binary node
916 # looks like it could be a binary node
917 try:
917 try:
918 new.decode('utf-8')
918 new.decode('utf-8')
919 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
919 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
920 except UnicodeDecodeError:
920 except UnicodeDecodeError:
921 pass # binary, leave unmodified
921 pass # binary, leave unmodified
922 else:
922 else:
923 new = encoding.tolocal(new) # normal path
923 new = encoding.tolocal(new) # normal path
924
924
925 if util.safehasattr(proto, 'restore'):
925 if util.safehasattr(proto, 'restore'):
926
926
927 proto.redirect()
927 proto.redirect()
928
928
929 try:
929 try:
930 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
930 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
931 encoding.tolocal(old), new) or False
931 encoding.tolocal(old), new) or False
932 except error.Abort:
932 except error.Abort:
933 r = False
933 r = False
934
934
935 output = proto.restore()
935 output = proto.restore()
936
936
937 return '%s\n%s' % (int(r), output)
937 return '%s\n%s' % (int(r), output)
938
938
939 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
939 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
940 encoding.tolocal(old), new)
940 encoding.tolocal(old), new)
941 return '%s\n' % int(r)
941 return '%s\n' % int(r)
942
942
943 @wireprotocommand('stream_out')
943 @wireprotocommand('stream_out')
944 def stream(repo, proto):
944 def stream(repo, proto):
945 '''If the server supports streaming clone, it advertises the "stream"
945 '''If the server supports streaming clone, it advertises the "stream"
946 capability with a value representing the version and flags of the repo
946 capability with a value representing the version and flags of the repo
947 it is serving. Client checks to see if it understands the format.
947 it is serving. Client checks to see if it understands the format.
948 '''
948 '''
949 if not streamclone.allowservergeneration(repo):
949 if not streamclone.allowservergeneration(repo):
950 return '1\n'
950 return '1\n'
951
951
952 def getstream(it):
952 def getstream(it):
953 yield '0\n'
953 yield '0\n'
954 for chunk in it:
954 for chunk in it:
955 yield chunk
955 yield chunk
956
956
957 try:
957 try:
958 # LockError may be raised before the first result is yielded. Don't
958 # LockError may be raised before the first result is yielded. Don't
959 # emit output until we're sure we got the lock successfully.
959 # emit output until we're sure we got the lock successfully.
960 it = streamclone.generatev1wireproto(repo)
960 it = streamclone.generatev1wireproto(repo)
961 return streamres(gen=getstream(it))
961 return streamres(gen=getstream(it))
962 except error.LockError:
962 except error.LockError:
963 return '2\n'
963 return '2\n'
964
964
965 @wireprotocommand('unbundle', 'heads')
965 @wireprotocommand('unbundle', 'heads')
966 def unbundle(repo, proto, heads):
966 def unbundle(repo, proto, heads):
967 their_heads = decodelist(heads)
967 their_heads = decodelist(heads)
968
968
969 try:
969 try:
970 proto.redirect()
970 proto.redirect()
971
971
972 exchange.check_heads(repo, their_heads, 'preparing changes')
972 exchange.check_heads(repo, their_heads, 'preparing changes')
973
973
974 # write bundle data to temporary file because it can be big
974 # write bundle data to temporary file because it can be big
975 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
975 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
976 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
976 fp = os.fdopen(fd, pycompat.sysstr('wb+'))
977 r = 0
977 r = 0
978 try:
978 try:
979 proto.getfile(fp)
979 proto.getfile(fp)
980 fp.seek(0)
980 fp.seek(0)
981 gen = exchange.readbundle(repo.ui, fp, None)
981 gen = exchange.readbundle(repo.ui, fp, None)
982 if (isinstance(gen, changegroupmod.cg1unpacker)
982 if (isinstance(gen, changegroupmod.cg1unpacker)
983 and not bundle1allowed(repo, 'push')):
983 and not bundle1allowed(repo, 'push')):
984 if proto.name == 'http':
984 if proto.name == 'http':
985 # need to special case http because stderr do not get to
985 # need to special case http because stderr do not get to
986 # the http client on failed push so we need to abuse some
986 # the http client on failed push so we need to abuse some
987 # other error type to make sure the message get to the
987 # other error type to make sure the message get to the
988 # user.
988 # user.
989 return ooberror(bundle2required)
989 return ooberror(bundle2required)
990 raise error.Abort(bundle2requiredmain,
990 raise error.Abort(bundle2requiredmain,
991 hint=bundle2requiredhint)
991 hint=bundle2requiredhint)
992
992
993 r = exchange.unbundle(repo, gen, their_heads, 'serve',
993 r = exchange.unbundle(repo, gen, their_heads, 'serve',
994 proto._client())
994 proto._client())
995 if util.safehasattr(r, 'addpart'):
995 if util.safehasattr(r, 'addpart'):
996 # The return looks streamable, we are in the bundle2 case and
996 # The return looks streamable, we are in the bundle2 case and
997 # should return a stream.
997 # should return a stream.
998 return streamres(gen=r.getchunks())
998 return streamres(gen=r.getchunks())
999 return pushres(r)
999 return pushres(r)
1000
1000
1001 finally:
1001 finally:
1002 fp.close()
1002 fp.close()
1003 os.unlink(tempname)
1003 os.unlink(tempname)
1004
1004
1005 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1005 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
1006 # handle non-bundle2 case first
1006 # handle non-bundle2 case first
1007 if not getattr(exc, 'duringunbundle2', False):
1007 if not getattr(exc, 'duringunbundle2', False):
1008 try:
1008 try:
1009 raise
1009 raise
1010 except error.Abort:
1010 except error.Abort:
1011 # The old code we moved used util.stderr directly.
1011 # The old code we moved used util.stderr directly.
1012 # We did not change it to minimise code change.
1012 # We did not change it to minimise code change.
1013 # This need to be moved to something proper.
1013 # This need to be moved to something proper.
1014 # Feel free to do it.
1014 # Feel free to do it.
1015 util.stderr.write("abort: %s\n" % exc)
1015 util.stderr.write("abort: %s\n" % exc)
1016 if exc.hint is not None:
1016 if exc.hint is not None:
1017 util.stderr.write("(%s)\n" % exc.hint)
1017 util.stderr.write("(%s)\n" % exc.hint)
1018 return pushres(0)
1018 return pushres(0)
1019 except error.PushRaced:
1019 except error.PushRaced:
1020 return pusherr(str(exc))
1020 return pusherr(str(exc))
1021
1021
1022 bundler = bundle2.bundle20(repo.ui)
1022 bundler = bundle2.bundle20(repo.ui)
1023 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1023 for out in getattr(exc, '_bundle2salvagedoutput', ()):
1024 bundler.addpart(out)
1024 bundler.addpart(out)
1025 try:
1025 try:
1026 try:
1026 try:
1027 raise
1027 raise
1028 except error.PushkeyFailed as exc:
1028 except error.PushkeyFailed as exc:
1029 # check client caps
1029 # check client caps
1030 remotecaps = getattr(exc, '_replycaps', None)
1030 remotecaps = getattr(exc, '_replycaps', None)
1031 if (remotecaps is not None
1031 if (remotecaps is not None
1032 and 'pushkey' not in remotecaps.get('error', ())):
1032 and 'pushkey' not in remotecaps.get('error', ())):
1033 # no support remote side, fallback to Abort handler.
1033 # no support remote side, fallback to Abort handler.
1034 raise
1034 raise
1035 part = bundler.newpart('error:pushkey')
1035 part = bundler.newpart('error:pushkey')
1036 part.addparam('in-reply-to', exc.partid)
1036 part.addparam('in-reply-to', exc.partid)
1037 if exc.namespace is not None:
1037 if exc.namespace is not None:
1038 part.addparam('namespace', exc.namespace, mandatory=False)
1038 part.addparam('namespace', exc.namespace, mandatory=False)
1039 if exc.key is not None:
1039 if exc.key is not None:
1040 part.addparam('key', exc.key, mandatory=False)
1040 part.addparam('key', exc.key, mandatory=False)
1041 if exc.new is not None:
1041 if exc.new is not None:
1042 part.addparam('new', exc.new, mandatory=False)
1042 part.addparam('new', exc.new, mandatory=False)
1043 if exc.old is not None:
1043 if exc.old is not None:
1044 part.addparam('old', exc.old, mandatory=False)
1044 part.addparam('old', exc.old, mandatory=False)
1045 if exc.ret is not None:
1045 if exc.ret is not None:
1046 part.addparam('ret', exc.ret, mandatory=False)
1046 part.addparam('ret', exc.ret, mandatory=False)
1047 except error.BundleValueError as exc:
1047 except error.BundleValueError as exc:
1048 errpart = bundler.newpart('error:unsupportedcontent')
1048 errpart = bundler.newpart('error:unsupportedcontent')
1049 if exc.parttype is not None:
1049 if exc.parttype is not None:
1050 errpart.addparam('parttype', exc.parttype)
1050 errpart.addparam('parttype', exc.parttype)
1051 if exc.params:
1051 if exc.params:
1052 errpart.addparam('params', '\0'.join(exc.params))
1052 errpart.addparam('params', '\0'.join(exc.params))
1053 except error.Abort as exc:
1053 except error.Abort as exc:
1054 manargs = [('message', str(exc))]
1054 manargs = [('message', str(exc))]
1055 advargs = []
1055 advargs = []
1056 if exc.hint is not None:
1056 if exc.hint is not None:
1057 advargs.append(('hint', exc.hint))
1057 advargs.append(('hint', exc.hint))
1058 bundler.addpart(bundle2.bundlepart('error:abort',
1058 bundler.addpart(bundle2.bundlepart('error:abort',
1059 manargs, advargs))
1059 manargs, advargs))
1060 except error.PushRaced as exc:
1060 except error.PushRaced as exc:
1061 bundler.newpart('error:pushraced', [('message', str(exc))])
1061 bundler.newpart('error:pushraced', [('message', str(exc))])
1062 return streamres(gen=bundler.getchunks())
1062 return streamres(gen=bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now