##// END OF EJS Templates
wireproto: unescape argument names in batch command (BC)...
Gregory Szorc -
r29734:62e2e048 default
parent child Browse files
Show More
@@ -1,957 +1,962
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
11 import itertools
12 import os
12 import os
13 import sys
13 import sys
14 import tempfile
14 import tempfile
15
15
16 from .i18n import _
16 from .i18n import _
17 from .node import (
17 from .node import (
18 bin,
18 bin,
19 hex,
19 hex,
20 )
20 )
21
21
22 from . import (
22 from . import (
23 bundle2,
23 bundle2,
24 changegroup as changegroupmod,
24 changegroup as changegroupmod,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 streamclone,
30 streamclone,
31 util,
31 util,
32 )
32 )
33
33
34 urlerr = util.urlerr
34 urlerr = util.urlerr
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37 bundle2required = _(
37 bundle2required = _(
38 'incompatible Mercurial client; bundle2 required\n'
38 'incompatible Mercurial client; bundle2 required\n'
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40
40
41 class abstractserverproto(object):
41 class abstractserverproto(object):
42 """abstract class that summarizes the protocol API
42 """abstract class that summarizes the protocol API
43
43
44 Used as reference and documentation.
44 Used as reference and documentation.
45 """
45 """
46
46
47 def getargs(self, args):
47 def getargs(self, args):
48 """return the value for arguments in <args>
48 """return the value for arguments in <args>
49
49
50 returns a list of values (same order as <args>)"""
50 returns a list of values (same order as <args>)"""
51 raise NotImplementedError()
51 raise NotImplementedError()
52
52
53 def getfile(self, fp):
53 def getfile(self, fp):
54 """write the whole content of a file into a file like object
54 """write the whole content of a file into a file like object
55
55
56 The file is in the form::
56 The file is in the form::
57
57
58 (<chunk-size>\n<chunk>)+0\n
58 (<chunk-size>\n<chunk>)+0\n
59
59
60 chunk size is the ascii version of the int.
60 chunk size is the ascii version of the int.
61 """
61 """
62 raise NotImplementedError()
62 raise NotImplementedError()
63
63
64 def redirect(self):
64 def redirect(self):
65 """may setup interception for stdout and stderr
65 """may setup interception for stdout and stderr
66
66
67 See also the `restore` method."""
67 See also the `restore` method."""
68 raise NotImplementedError()
68 raise NotImplementedError()
69
69
70 # If the `redirect` function does install interception, the `restore`
70 # If the `redirect` function does install interception, the `restore`
71 # function MUST be defined. If interception is not used, this function
71 # function MUST be defined. If interception is not used, this function
72 # MUST NOT be defined.
72 # MUST NOT be defined.
73 #
73 #
74 # left commented here on purpose
74 # left commented here on purpose
75 #
75 #
76 #def restore(self):
76 #def restore(self):
77 # """reinstall previous stdout and stderr and return intercepted stdout
77 # """reinstall previous stdout and stderr and return intercepted stdout
78 # """
78 # """
79 # raise NotImplementedError()
79 # raise NotImplementedError()
80
80
81 def groupchunks(self, cg):
81 def groupchunks(self, cg):
82 """return 4096 chunks from a changegroup object
82 """return 4096 chunks from a changegroup object
83
83
84 Some protocols may have compressed the contents."""
84 Some protocols may have compressed the contents."""
85 raise NotImplementedError()
85 raise NotImplementedError()
86
86
87 class remotebatch(peer.batcher):
87 class remotebatch(peer.batcher):
88 '''batches the queued calls; uses as few roundtrips as possible'''
88 '''batches the queued calls; uses as few roundtrips as possible'''
89 def __init__(self, remote):
89 def __init__(self, remote):
90 '''remote must support _submitbatch(encbatch) and
90 '''remote must support _submitbatch(encbatch) and
91 _submitone(op, encargs)'''
91 _submitone(op, encargs)'''
92 peer.batcher.__init__(self)
92 peer.batcher.__init__(self)
93 self.remote = remote
93 self.remote = remote
94 def submit(self):
94 def submit(self):
95 req, rsp = [], []
95 req, rsp = [], []
96 for name, args, opts, resref in self.calls:
96 for name, args, opts, resref in self.calls:
97 mtd = getattr(self.remote, name)
97 mtd = getattr(self.remote, name)
98 batchablefn = getattr(mtd, 'batchable', None)
98 batchablefn = getattr(mtd, 'batchable', None)
99 if batchablefn is not None:
99 if batchablefn is not None:
100 batchable = batchablefn(mtd.im_self, *args, **opts)
100 batchable = batchablefn(mtd.im_self, *args, **opts)
101 encargsorres, encresref = next(batchable)
101 encargsorres, encresref = next(batchable)
102 if encresref:
102 if encresref:
103 req.append((name, encargsorres,))
103 req.append((name, encargsorres,))
104 rsp.append((batchable, encresref, resref,))
104 rsp.append((batchable, encresref, resref,))
105 else:
105 else:
106 resref.set(encargsorres)
106 resref.set(encargsorres)
107 else:
107 else:
108 if req:
108 if req:
109 self._submitreq(req, rsp)
109 self._submitreq(req, rsp)
110 req, rsp = [], []
110 req, rsp = [], []
111 resref.set(mtd(*args, **opts))
111 resref.set(mtd(*args, **opts))
112 if req:
112 if req:
113 self._submitreq(req, rsp)
113 self._submitreq(req, rsp)
114 def _submitreq(self, req, rsp):
114 def _submitreq(self, req, rsp):
115 encresults = self.remote._submitbatch(req)
115 encresults = self.remote._submitbatch(req)
116 for encres, r in zip(encresults, rsp):
116 for encres, r in zip(encresults, rsp):
117 batchable, encresref, resref = r
117 batchable, encresref, resref = r
118 encresref.set(encres)
118 encresref.set(encres)
119 resref.set(next(batchable))
119 resref.set(next(batchable))
120
120
121 class remoteiterbatcher(peer.iterbatcher):
121 class remoteiterbatcher(peer.iterbatcher):
122 def __init__(self, remote):
122 def __init__(self, remote):
123 super(remoteiterbatcher, self).__init__()
123 super(remoteiterbatcher, self).__init__()
124 self._remote = remote
124 self._remote = remote
125
125
126 def __getattr__(self, name):
126 def __getattr__(self, name):
127 if not getattr(self._remote, name, False):
127 if not getattr(self._remote, name, False):
128 raise AttributeError(
128 raise AttributeError(
129 'Attempted to iterbatch non-batchable call to %r' % name)
129 'Attempted to iterbatch non-batchable call to %r' % name)
130 return super(remoteiterbatcher, self).__getattr__(name)
130 return super(remoteiterbatcher, self).__getattr__(name)
131
131
132 def submit(self):
132 def submit(self):
133 """Break the batch request into many patch calls and pipeline them.
133 """Break the batch request into many patch calls and pipeline them.
134
134
135 This is mostly valuable over http where request sizes can be
135 This is mostly valuable over http where request sizes can be
136 limited, but can be used in other places as well.
136 limited, but can be used in other places as well.
137 """
137 """
138 req, rsp = [], []
138 req, rsp = [], []
139 for name, args, opts, resref in self.calls:
139 for name, args, opts, resref in self.calls:
140 mtd = getattr(self._remote, name)
140 mtd = getattr(self._remote, name)
141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
142 encargsorres, encresref = next(batchable)
142 encargsorres, encresref = next(batchable)
143 assert encresref
143 assert encresref
144 req.append((name, encargsorres))
144 req.append((name, encargsorres))
145 rsp.append((batchable, encresref))
145 rsp.append((batchable, encresref))
146 if req:
146 if req:
147 self._resultiter = self._remote._submitbatch(req)
147 self._resultiter = self._remote._submitbatch(req)
148 self._rsp = rsp
148 self._rsp = rsp
149
149
150 def results(self):
150 def results(self):
151 for (batchable, encresref), encres in itertools.izip(
151 for (batchable, encresref), encres in itertools.izip(
152 self._rsp, self._resultiter):
152 self._rsp, self._resultiter):
153 encresref.set(encres)
153 encresref.set(encres)
154 yield next(batchable)
154 yield next(batchable)
155
155
156 # Forward a couple of names from peer to make wireproto interactions
156 # Forward a couple of names from peer to make wireproto interactions
157 # slightly more sensible.
157 # slightly more sensible.
158 batchable = peer.batchable
158 batchable = peer.batchable
159 future = peer.future
159 future = peer.future
160
160
161 # list of nodes encoding / decoding
161 # list of nodes encoding / decoding
162
162
163 def decodelist(l, sep=' '):
163 def decodelist(l, sep=' '):
164 if l:
164 if l:
165 return map(bin, l.split(sep))
165 return map(bin, l.split(sep))
166 return []
166 return []
167
167
168 def encodelist(l, sep=' '):
168 def encodelist(l, sep=' '):
169 try:
169 try:
170 return sep.join(map(hex, l))
170 return sep.join(map(hex, l))
171 except TypeError:
171 except TypeError:
172 raise
172 raise
173
173
174 # batched call argument encoding
174 # batched call argument encoding
175
175
176 def escapearg(plain):
176 def escapearg(plain):
177 return (plain
177 return (plain
178 .replace(':', ':c')
178 .replace(':', ':c')
179 .replace(',', ':o')
179 .replace(',', ':o')
180 .replace(';', ':s')
180 .replace(';', ':s')
181 .replace('=', ':e'))
181 .replace('=', ':e'))
182
182
183 def unescapearg(escaped):
183 def unescapearg(escaped):
184 return (escaped
184 return (escaped
185 .replace(':e', '=')
185 .replace(':e', '=')
186 .replace(':s', ';')
186 .replace(':s', ';')
187 .replace(':o', ',')
187 .replace(':o', ',')
188 .replace(':c', ':'))
188 .replace(':c', ':'))
189
189
190 def encodebatchcmds(req):
190 def encodebatchcmds(req):
191 """Return a ``cmds`` argument value for the ``batch`` command."""
191 """Return a ``cmds`` argument value for the ``batch`` command."""
192 cmds = []
192 cmds = []
193 for op, argsdict in req:
193 for op, argsdict in req:
194 # Old servers didn't properly unescape argument names. So prevent
195 # the sending of argument names that may not be decoded properly by
196 # servers.
197 assert all(escapearg(k) == k for k in argsdict)
198
194 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
199 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
195 for k, v in argsdict.iteritems())
200 for k, v in argsdict.iteritems())
196 cmds.append('%s %s' % (op, args))
201 cmds.append('%s %s' % (op, args))
197
202
198 return ';'.join(cmds)
203 return ';'.join(cmds)
199
204
200 # mapping of options accepted by getbundle and their types
205 # mapping of options accepted by getbundle and their types
201 #
206 #
202 # Meant to be extended by extensions. It is extensions responsibility to ensure
207 # Meant to be extended by extensions. It is extensions responsibility to ensure
203 # such options are properly processed in exchange.getbundle.
208 # such options are properly processed in exchange.getbundle.
204 #
209 #
205 # supported types are:
210 # supported types are:
206 #
211 #
207 # :nodes: list of binary nodes
212 # :nodes: list of binary nodes
208 # :csv: list of comma-separated values
213 # :csv: list of comma-separated values
209 # :scsv: list of comma-separated values return as set
214 # :scsv: list of comma-separated values return as set
210 # :plain: string with no transformation needed.
215 # :plain: string with no transformation needed.
211 gboptsmap = {'heads': 'nodes',
216 gboptsmap = {'heads': 'nodes',
212 'common': 'nodes',
217 'common': 'nodes',
213 'obsmarkers': 'boolean',
218 'obsmarkers': 'boolean',
214 'bundlecaps': 'scsv',
219 'bundlecaps': 'scsv',
215 'listkeys': 'csv',
220 'listkeys': 'csv',
216 'cg': 'boolean',
221 'cg': 'boolean',
217 'cbattempted': 'boolean'}
222 'cbattempted': 'boolean'}
218
223
219 # client side
224 # client side
220
225
221 class wirepeer(peer.peerrepository):
226 class wirepeer(peer.peerrepository):
222 """Client-side interface for communicating with a peer repository.
227 """Client-side interface for communicating with a peer repository.
223
228
224 Methods commonly call wire protocol commands of the same name.
229 Methods commonly call wire protocol commands of the same name.
225
230
226 See also httppeer.py and sshpeer.py for protocol-specific
231 See also httppeer.py and sshpeer.py for protocol-specific
227 implementations of this interface.
232 implementations of this interface.
228 """
233 """
229 def batch(self):
234 def batch(self):
230 if self.capable('batch'):
235 if self.capable('batch'):
231 return remotebatch(self)
236 return remotebatch(self)
232 else:
237 else:
233 return peer.localbatch(self)
238 return peer.localbatch(self)
234 def _submitbatch(self, req):
239 def _submitbatch(self, req):
235 """run batch request <req> on the server
240 """run batch request <req> on the server
236
241
237 Returns an iterator of the raw responses from the server.
242 Returns an iterator of the raw responses from the server.
238 """
243 """
239 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
244 rsp = self._callstream("batch", cmds=encodebatchcmds(req))
240 chunk = rsp.read(1024)
245 chunk = rsp.read(1024)
241 work = [chunk]
246 work = [chunk]
242 while chunk:
247 while chunk:
243 while ';' not in chunk and chunk:
248 while ';' not in chunk and chunk:
244 chunk = rsp.read(1024)
249 chunk = rsp.read(1024)
245 work.append(chunk)
250 work.append(chunk)
246 merged = ''.join(work)
251 merged = ''.join(work)
247 while ';' in merged:
252 while ';' in merged:
248 one, merged = merged.split(';', 1)
253 one, merged = merged.split(';', 1)
249 yield unescapearg(one)
254 yield unescapearg(one)
250 chunk = rsp.read(1024)
255 chunk = rsp.read(1024)
251 work = [merged, chunk]
256 work = [merged, chunk]
252 yield unescapearg(''.join(work))
257 yield unescapearg(''.join(work))
253
258
254 def _submitone(self, op, args):
259 def _submitone(self, op, args):
255 return self._call(op, **args)
260 return self._call(op, **args)
256
261
257 def iterbatch(self):
262 def iterbatch(self):
258 return remoteiterbatcher(self)
263 return remoteiterbatcher(self)
259
264
260 @batchable
265 @batchable
261 def lookup(self, key):
266 def lookup(self, key):
262 self.requirecap('lookup', _('look up remote revision'))
267 self.requirecap('lookup', _('look up remote revision'))
263 f = future()
268 f = future()
264 yield {'key': encoding.fromlocal(key)}, f
269 yield {'key': encoding.fromlocal(key)}, f
265 d = f.value
270 d = f.value
266 success, data = d[:-1].split(" ", 1)
271 success, data = d[:-1].split(" ", 1)
267 if int(success):
272 if int(success):
268 yield bin(data)
273 yield bin(data)
269 self._abort(error.RepoError(data))
274 self._abort(error.RepoError(data))
270
275
271 @batchable
276 @batchable
272 def heads(self):
277 def heads(self):
273 f = future()
278 f = future()
274 yield {}, f
279 yield {}, f
275 d = f.value
280 d = f.value
276 try:
281 try:
277 yield decodelist(d[:-1])
282 yield decodelist(d[:-1])
278 except ValueError:
283 except ValueError:
279 self._abort(error.ResponseError(_("unexpected response:"), d))
284 self._abort(error.ResponseError(_("unexpected response:"), d))
280
285
281 @batchable
286 @batchable
282 def known(self, nodes):
287 def known(self, nodes):
283 f = future()
288 f = future()
284 yield {'nodes': encodelist(nodes)}, f
289 yield {'nodes': encodelist(nodes)}, f
285 d = f.value
290 d = f.value
286 try:
291 try:
287 yield [bool(int(b)) for b in d]
292 yield [bool(int(b)) for b in d]
288 except ValueError:
293 except ValueError:
289 self._abort(error.ResponseError(_("unexpected response:"), d))
294 self._abort(error.ResponseError(_("unexpected response:"), d))
290
295
291 @batchable
296 @batchable
292 def branchmap(self):
297 def branchmap(self):
293 f = future()
298 f = future()
294 yield {}, f
299 yield {}, f
295 d = f.value
300 d = f.value
296 try:
301 try:
297 branchmap = {}
302 branchmap = {}
298 for branchpart in d.splitlines():
303 for branchpart in d.splitlines():
299 branchname, branchheads = branchpart.split(' ', 1)
304 branchname, branchheads = branchpart.split(' ', 1)
300 branchname = encoding.tolocal(urlreq.unquote(branchname))
305 branchname = encoding.tolocal(urlreq.unquote(branchname))
301 branchheads = decodelist(branchheads)
306 branchheads = decodelist(branchheads)
302 branchmap[branchname] = branchheads
307 branchmap[branchname] = branchheads
303 yield branchmap
308 yield branchmap
304 except TypeError:
309 except TypeError:
305 self._abort(error.ResponseError(_("unexpected response:"), d))
310 self._abort(error.ResponseError(_("unexpected response:"), d))
306
311
307 def branches(self, nodes):
312 def branches(self, nodes):
308 n = encodelist(nodes)
313 n = encodelist(nodes)
309 d = self._call("branches", nodes=n)
314 d = self._call("branches", nodes=n)
310 try:
315 try:
311 br = [tuple(decodelist(b)) for b in d.splitlines()]
316 br = [tuple(decodelist(b)) for b in d.splitlines()]
312 return br
317 return br
313 except ValueError:
318 except ValueError:
314 self._abort(error.ResponseError(_("unexpected response:"), d))
319 self._abort(error.ResponseError(_("unexpected response:"), d))
315
320
316 def between(self, pairs):
321 def between(self, pairs):
317 batch = 8 # avoid giant requests
322 batch = 8 # avoid giant requests
318 r = []
323 r = []
319 for i in xrange(0, len(pairs), batch):
324 for i in xrange(0, len(pairs), batch):
320 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
325 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
321 d = self._call("between", pairs=n)
326 d = self._call("between", pairs=n)
322 try:
327 try:
323 r.extend(l and decodelist(l) or [] for l in d.splitlines())
328 r.extend(l and decodelist(l) or [] for l in d.splitlines())
324 except ValueError:
329 except ValueError:
325 self._abort(error.ResponseError(_("unexpected response:"), d))
330 self._abort(error.ResponseError(_("unexpected response:"), d))
326 return r
331 return r
327
332
328 @batchable
333 @batchable
329 def pushkey(self, namespace, key, old, new):
334 def pushkey(self, namespace, key, old, new):
330 if not self.capable('pushkey'):
335 if not self.capable('pushkey'):
331 yield False, None
336 yield False, None
332 f = future()
337 f = future()
333 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
338 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
334 yield {'namespace': encoding.fromlocal(namespace),
339 yield {'namespace': encoding.fromlocal(namespace),
335 'key': encoding.fromlocal(key),
340 'key': encoding.fromlocal(key),
336 'old': encoding.fromlocal(old),
341 'old': encoding.fromlocal(old),
337 'new': encoding.fromlocal(new)}, f
342 'new': encoding.fromlocal(new)}, f
338 d = f.value
343 d = f.value
339 d, output = d.split('\n', 1)
344 d, output = d.split('\n', 1)
340 try:
345 try:
341 d = bool(int(d))
346 d = bool(int(d))
342 except ValueError:
347 except ValueError:
343 raise error.ResponseError(
348 raise error.ResponseError(
344 _('push failed (unexpected response):'), d)
349 _('push failed (unexpected response):'), d)
345 for l in output.splitlines(True):
350 for l in output.splitlines(True):
346 self.ui.status(_('remote: '), l)
351 self.ui.status(_('remote: '), l)
347 yield d
352 yield d
348
353
349 @batchable
354 @batchable
350 def listkeys(self, namespace):
355 def listkeys(self, namespace):
351 if not self.capable('pushkey'):
356 if not self.capable('pushkey'):
352 yield {}, None
357 yield {}, None
353 f = future()
358 f = future()
354 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
359 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
355 yield {'namespace': encoding.fromlocal(namespace)}, f
360 yield {'namespace': encoding.fromlocal(namespace)}, f
356 d = f.value
361 d = f.value
357 self.ui.debug('received listkey for "%s": %i bytes\n'
362 self.ui.debug('received listkey for "%s": %i bytes\n'
358 % (namespace, len(d)))
363 % (namespace, len(d)))
359 yield pushkeymod.decodekeys(d)
364 yield pushkeymod.decodekeys(d)
360
365
361 def stream_out(self):
366 def stream_out(self):
362 return self._callstream('stream_out')
367 return self._callstream('stream_out')
363
368
364 def changegroup(self, nodes, kind):
369 def changegroup(self, nodes, kind):
365 n = encodelist(nodes)
370 n = encodelist(nodes)
366 f = self._callcompressable("changegroup", roots=n)
371 f = self._callcompressable("changegroup", roots=n)
367 return changegroupmod.cg1unpacker(f, 'UN')
372 return changegroupmod.cg1unpacker(f, 'UN')
368
373
369 def changegroupsubset(self, bases, heads, kind):
374 def changegroupsubset(self, bases, heads, kind):
370 self.requirecap('changegroupsubset', _('look up remote changes'))
375 self.requirecap('changegroupsubset', _('look up remote changes'))
371 bases = encodelist(bases)
376 bases = encodelist(bases)
372 heads = encodelist(heads)
377 heads = encodelist(heads)
373 f = self._callcompressable("changegroupsubset",
378 f = self._callcompressable("changegroupsubset",
374 bases=bases, heads=heads)
379 bases=bases, heads=heads)
375 return changegroupmod.cg1unpacker(f, 'UN')
380 return changegroupmod.cg1unpacker(f, 'UN')
376
381
377 def getbundle(self, source, **kwargs):
382 def getbundle(self, source, **kwargs):
378 self.requirecap('getbundle', _('look up remote changes'))
383 self.requirecap('getbundle', _('look up remote changes'))
379 opts = {}
384 opts = {}
380 bundlecaps = kwargs.get('bundlecaps')
385 bundlecaps = kwargs.get('bundlecaps')
381 if bundlecaps is not None:
386 if bundlecaps is not None:
382 kwargs['bundlecaps'] = sorted(bundlecaps)
387 kwargs['bundlecaps'] = sorted(bundlecaps)
383 else:
388 else:
384 bundlecaps = () # kwargs could have it to None
389 bundlecaps = () # kwargs could have it to None
385 for key, value in kwargs.iteritems():
390 for key, value in kwargs.iteritems():
386 if value is None:
391 if value is None:
387 continue
392 continue
388 keytype = gboptsmap.get(key)
393 keytype = gboptsmap.get(key)
389 if keytype is None:
394 if keytype is None:
390 assert False, 'unexpected'
395 assert False, 'unexpected'
391 elif keytype == 'nodes':
396 elif keytype == 'nodes':
392 value = encodelist(value)
397 value = encodelist(value)
393 elif keytype in ('csv', 'scsv'):
398 elif keytype in ('csv', 'scsv'):
394 value = ','.join(value)
399 value = ','.join(value)
395 elif keytype == 'boolean':
400 elif keytype == 'boolean':
396 value = '%i' % bool(value)
401 value = '%i' % bool(value)
397 elif keytype != 'plain':
402 elif keytype != 'plain':
398 raise KeyError('unknown getbundle option type %s'
403 raise KeyError('unknown getbundle option type %s'
399 % keytype)
404 % keytype)
400 opts[key] = value
405 opts[key] = value
401 f = self._callcompressable("getbundle", **opts)
406 f = self._callcompressable("getbundle", **opts)
402 if any((cap.startswith('HG2') for cap in bundlecaps)):
407 if any((cap.startswith('HG2') for cap in bundlecaps)):
403 return bundle2.getunbundler(self.ui, f)
408 return bundle2.getunbundler(self.ui, f)
404 else:
409 else:
405 return changegroupmod.cg1unpacker(f, 'UN')
410 return changegroupmod.cg1unpacker(f, 'UN')
406
411
407 def unbundle(self, cg, heads, url):
412 def unbundle(self, cg, heads, url):
408 '''Send cg (a readable file-like object representing the
413 '''Send cg (a readable file-like object representing the
409 changegroup to push, typically a chunkbuffer object) to the
414 changegroup to push, typically a chunkbuffer object) to the
410 remote server as a bundle.
415 remote server as a bundle.
411
416
412 When pushing a bundle10 stream, return an integer indicating the
417 When pushing a bundle10 stream, return an integer indicating the
413 result of the push (see localrepository.addchangegroup()).
418 result of the push (see localrepository.addchangegroup()).
414
419
415 When pushing a bundle20 stream, return a bundle20 stream.
420 When pushing a bundle20 stream, return a bundle20 stream.
416
421
417 `url` is the url the client thinks it's pushing to, which is
422 `url` is the url the client thinks it's pushing to, which is
418 visible to hooks.
423 visible to hooks.
419 '''
424 '''
420
425
421 if heads != ['force'] and self.capable('unbundlehash'):
426 if heads != ['force'] and self.capable('unbundlehash'):
422 heads = encodelist(['hashed',
427 heads = encodelist(['hashed',
423 hashlib.sha1(''.join(sorted(heads))).digest()])
428 hashlib.sha1(''.join(sorted(heads))).digest()])
424 else:
429 else:
425 heads = encodelist(heads)
430 heads = encodelist(heads)
426
431
427 if util.safehasattr(cg, 'deltaheader'):
432 if util.safehasattr(cg, 'deltaheader'):
428 # this a bundle10, do the old style call sequence
433 # this a bundle10, do the old style call sequence
429 ret, output = self._callpush("unbundle", cg, heads=heads)
434 ret, output = self._callpush("unbundle", cg, heads=heads)
430 if ret == "":
435 if ret == "":
431 raise error.ResponseError(
436 raise error.ResponseError(
432 _('push failed:'), output)
437 _('push failed:'), output)
433 try:
438 try:
434 ret = int(ret)
439 ret = int(ret)
435 except ValueError:
440 except ValueError:
436 raise error.ResponseError(
441 raise error.ResponseError(
437 _('push failed (unexpected response):'), ret)
442 _('push failed (unexpected response):'), ret)
438
443
439 for l in output.splitlines(True):
444 for l in output.splitlines(True):
440 self.ui.status(_('remote: '), l)
445 self.ui.status(_('remote: '), l)
441 else:
446 else:
442 # bundle2 push. Send a stream, fetch a stream.
447 # bundle2 push. Send a stream, fetch a stream.
443 stream = self._calltwowaystream('unbundle', cg, heads=heads)
448 stream = self._calltwowaystream('unbundle', cg, heads=heads)
444 ret = bundle2.getunbundler(self.ui, stream)
449 ret = bundle2.getunbundler(self.ui, stream)
445 return ret
450 return ret
446
451
447 def debugwireargs(self, one, two, three=None, four=None, five=None):
452 def debugwireargs(self, one, two, three=None, four=None, five=None):
448 # don't pass optional arguments left at their default value
453 # don't pass optional arguments left at their default value
449 opts = {}
454 opts = {}
450 if three is not None:
455 if three is not None:
451 opts['three'] = three
456 opts['three'] = three
452 if four is not None:
457 if four is not None:
453 opts['four'] = four
458 opts['four'] = four
454 return self._call('debugwireargs', one=one, two=two, **opts)
459 return self._call('debugwireargs', one=one, two=two, **opts)
455
460
456 def _call(self, cmd, **args):
461 def _call(self, cmd, **args):
457 """execute <cmd> on the server
462 """execute <cmd> on the server
458
463
459 The command is expected to return a simple string.
464 The command is expected to return a simple string.
460
465
461 returns the server reply as a string."""
466 returns the server reply as a string."""
462 raise NotImplementedError()
467 raise NotImplementedError()
463
468
464 def _callstream(self, cmd, **args):
469 def _callstream(self, cmd, **args):
465 """execute <cmd> on the server
470 """execute <cmd> on the server
466
471
467 The command is expected to return a stream. Note that if the
472 The command is expected to return a stream. Note that if the
468 command doesn't return a stream, _callstream behaves
473 command doesn't return a stream, _callstream behaves
469 differently for ssh and http peers.
474 differently for ssh and http peers.
470
475
471 returns the server reply as a file like object.
476 returns the server reply as a file like object.
472 """
477 """
473 raise NotImplementedError()
478 raise NotImplementedError()
474
479
475 def _callcompressable(self, cmd, **args):
480 def _callcompressable(self, cmd, **args):
476 """execute <cmd> on the server
481 """execute <cmd> on the server
477
482
478 The command is expected to return a stream.
483 The command is expected to return a stream.
479
484
480 The stream may have been compressed in some implementations. This
485 The stream may have been compressed in some implementations. This
481 function takes care of the decompression. This is the only difference
486 function takes care of the decompression. This is the only difference
482 with _callstream.
487 with _callstream.
483
488
484 returns the server reply as a file like object.
489 returns the server reply as a file like object.
485 """
490 """
486 raise NotImplementedError()
491 raise NotImplementedError()
487
492
488 def _callpush(self, cmd, fp, **args):
493 def _callpush(self, cmd, fp, **args):
489 """execute a <cmd> on server
494 """execute a <cmd> on server
490
495
491 The command is expected to be related to a push. Push has a special
496 The command is expected to be related to a push. Push has a special
492 return method.
497 return method.
493
498
494 returns the server reply as a (ret, output) tuple. ret is either
499 returns the server reply as a (ret, output) tuple. ret is either
495 empty (error) or a stringified int.
500 empty (error) or a stringified int.
496 """
501 """
497 raise NotImplementedError()
502 raise NotImplementedError()
498
503
499 def _calltwowaystream(self, cmd, fp, **args):
504 def _calltwowaystream(self, cmd, fp, **args):
500 """execute <cmd> on server
505 """execute <cmd> on server
501
506
502 The command will send a stream to the server and get a stream in reply.
507 The command will send a stream to the server and get a stream in reply.
503 """
508 """
504 raise NotImplementedError()
509 raise NotImplementedError()
505
510
506 def _abort(self, exception):
511 def _abort(self, exception):
507 """clearly abort the wire protocol connection and raise the exception
512 """clearly abort the wire protocol connection and raise the exception
508 """
513 """
509 raise NotImplementedError()
514 raise NotImplementedError()
510
515
511 # server side
516 # server side
512
517
513 # wire protocol command can either return a string or one of these classes.
518 # wire protocol command can either return a string or one of these classes.
514 class streamres(object):
519 class streamres(object):
515 """wireproto reply: binary stream
520 """wireproto reply: binary stream
516
521
517 The call was successful and the result is a stream.
522 The call was successful and the result is a stream.
518 Iterate on the `self.gen` attribute to retrieve chunks.
523 Iterate on the `self.gen` attribute to retrieve chunks.
519 """
524 """
520 def __init__(self, gen):
525 def __init__(self, gen):
521 self.gen = gen
526 self.gen = gen
522
527
523 class pushres(object):
528 class pushres(object):
524 """wireproto reply: success with simple integer return
529 """wireproto reply: success with simple integer return
525
530
526 The call was successful and returned an integer contained in `self.res`.
531 The call was successful and returned an integer contained in `self.res`.
527 """
532 """
528 def __init__(self, res):
533 def __init__(self, res):
529 self.res = res
534 self.res = res
530
535
531 class pusherr(object):
536 class pusherr(object):
532 """wireproto reply: failure
537 """wireproto reply: failure
533
538
534 The call failed. The `self.res` attribute contains the error message.
539 The call failed. The `self.res` attribute contains the error message.
535 """
540 """
536 def __init__(self, res):
541 def __init__(self, res):
537 self.res = res
542 self.res = res
538
543
539 class ooberror(object):
544 class ooberror(object):
540 """wireproto reply: failure of a batch of operation
545 """wireproto reply: failure of a batch of operation
541
546
542 Something failed during a batch call. The error message is stored in
547 Something failed during a batch call. The error message is stored in
543 `self.message`.
548 `self.message`.
544 """
549 """
545 def __init__(self, message):
550 def __init__(self, message):
546 self.message = message
551 self.message = message
547
552
548 def getdispatchrepo(repo, proto, command):
553 def getdispatchrepo(repo, proto, command):
549 """Obtain the repo used for processing wire protocol commands.
554 """Obtain the repo used for processing wire protocol commands.
550
555
551 The intent of this function is to serve as a monkeypatch point for
556 The intent of this function is to serve as a monkeypatch point for
552 extensions that need commands to operate on different repo views under
557 extensions that need commands to operate on different repo views under
553 specialized circumstances.
558 specialized circumstances.
554 """
559 """
555 return repo.filtered('served')
560 return repo.filtered('served')
556
561
557 def dispatch(repo, proto, command):
562 def dispatch(repo, proto, command):
558 repo = getdispatchrepo(repo, proto, command)
563 repo = getdispatchrepo(repo, proto, command)
559 func, spec = commands[command]
564 func, spec = commands[command]
560 args = proto.getargs(spec)
565 args = proto.getargs(spec)
561 return func(repo, proto, *args)
566 return func(repo, proto, *args)
562
567
563 def options(cmd, keys, others):
568 def options(cmd, keys, others):
564 opts = {}
569 opts = {}
565 for k in keys:
570 for k in keys:
566 if k in others:
571 if k in others:
567 opts[k] = others[k]
572 opts[k] = others[k]
568 del others[k]
573 del others[k]
569 if others:
574 if others:
570 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
575 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
571 % (cmd, ",".join(others)))
576 % (cmd, ",".join(others)))
572 return opts
577 return opts
573
578
574 def bundle1allowed(repo, action):
579 def bundle1allowed(repo, action):
575 """Whether a bundle1 operation is allowed from the server.
580 """Whether a bundle1 operation is allowed from the server.
576
581
577 Priority is:
582 Priority is:
578
583
579 1. server.bundle1gd.<action> (if generaldelta active)
584 1. server.bundle1gd.<action> (if generaldelta active)
580 2. server.bundle1.<action>
585 2. server.bundle1.<action>
581 3. server.bundle1gd (if generaldelta active)
586 3. server.bundle1gd (if generaldelta active)
582 4. server.bundle1
587 4. server.bundle1
583 """
588 """
584 ui = repo.ui
589 ui = repo.ui
585 gd = 'generaldelta' in repo.requirements
590 gd = 'generaldelta' in repo.requirements
586
591
587 if gd:
592 if gd:
588 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
593 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
589 if v is not None:
594 if v is not None:
590 return v
595 return v
591
596
592 v = ui.configbool('server', 'bundle1.%s' % action, None)
597 v = ui.configbool('server', 'bundle1.%s' % action, None)
593 if v is not None:
598 if v is not None:
594 return v
599 return v
595
600
596 if gd:
601 if gd:
597 v = ui.configbool('server', 'bundle1gd', None)
602 v = ui.configbool('server', 'bundle1gd', None)
598 if v is not None:
603 if v is not None:
599 return v
604 return v
600
605
601 return ui.configbool('server', 'bundle1', True)
606 return ui.configbool('server', 'bundle1', True)
602
607
603 # list of commands
608 # list of commands
604 commands = {}
609 commands = {}
605
610
606 def wireprotocommand(name, args=''):
611 def wireprotocommand(name, args=''):
607 """decorator for wire protocol command"""
612 """decorator for wire protocol command"""
608 def register(func):
613 def register(func):
609 commands[name] = (func, args)
614 commands[name] = (func, args)
610 return func
615 return func
611 return register
616 return register
612
617
613 @wireprotocommand('batch', 'cmds *')
618 @wireprotocommand('batch', 'cmds *')
614 def batch(repo, proto, cmds, others):
619 def batch(repo, proto, cmds, others):
615 repo = repo.filtered("served")
620 repo = repo.filtered("served")
616 res = []
621 res = []
617 for pair in cmds.split(';'):
622 for pair in cmds.split(';'):
618 op, args = pair.split(' ', 1)
623 op, args = pair.split(' ', 1)
619 vals = {}
624 vals = {}
620 for a in args.split(','):
625 for a in args.split(','):
621 if a:
626 if a:
622 n, v = a.split('=')
627 n, v = a.split('=')
623 vals[n] = unescapearg(v)
628 vals[unescapearg(n)] = unescapearg(v)
624 func, spec = commands[op]
629 func, spec = commands[op]
625 if spec:
630 if spec:
626 keys = spec.split()
631 keys = spec.split()
627 data = {}
632 data = {}
628 for k in keys:
633 for k in keys:
629 if k == '*':
634 if k == '*':
630 star = {}
635 star = {}
631 for key in vals.keys():
636 for key in vals.keys():
632 if key not in keys:
637 if key not in keys:
633 star[key] = vals[key]
638 star[key] = vals[key]
634 data['*'] = star
639 data['*'] = star
635 else:
640 else:
636 data[k] = vals[k]
641 data[k] = vals[k]
637 result = func(repo, proto, *[data[k] for k in keys])
642 result = func(repo, proto, *[data[k] for k in keys])
638 else:
643 else:
639 result = func(repo, proto)
644 result = func(repo, proto)
640 if isinstance(result, ooberror):
645 if isinstance(result, ooberror):
641 return result
646 return result
642 res.append(escapearg(result))
647 res.append(escapearg(result))
643 return ';'.join(res)
648 return ';'.join(res)
644
649
645 @wireprotocommand('between', 'pairs')
650 @wireprotocommand('between', 'pairs')
646 def between(repo, proto, pairs):
651 def between(repo, proto, pairs):
647 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
652 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
648 r = []
653 r = []
649 for b in repo.between(pairs):
654 for b in repo.between(pairs):
650 r.append(encodelist(b) + "\n")
655 r.append(encodelist(b) + "\n")
651 return "".join(r)
656 return "".join(r)
652
657
653 @wireprotocommand('branchmap')
658 @wireprotocommand('branchmap')
654 def branchmap(repo, proto):
659 def branchmap(repo, proto):
655 branchmap = repo.branchmap()
660 branchmap = repo.branchmap()
656 heads = []
661 heads = []
657 for branch, nodes in branchmap.iteritems():
662 for branch, nodes in branchmap.iteritems():
658 branchname = urlreq.quote(encoding.fromlocal(branch))
663 branchname = urlreq.quote(encoding.fromlocal(branch))
659 branchnodes = encodelist(nodes)
664 branchnodes = encodelist(nodes)
660 heads.append('%s %s' % (branchname, branchnodes))
665 heads.append('%s %s' % (branchname, branchnodes))
661 return '\n'.join(heads)
666 return '\n'.join(heads)
662
667
663 @wireprotocommand('branches', 'nodes')
668 @wireprotocommand('branches', 'nodes')
664 def branches(repo, proto, nodes):
669 def branches(repo, proto, nodes):
665 nodes = decodelist(nodes)
670 nodes = decodelist(nodes)
666 r = []
671 r = []
667 for b in repo.branches(nodes):
672 for b in repo.branches(nodes):
668 r.append(encodelist(b) + "\n")
673 r.append(encodelist(b) + "\n")
669 return "".join(r)
674 return "".join(r)
670
675
671 @wireprotocommand('clonebundles', '')
676 @wireprotocommand('clonebundles', '')
672 def clonebundles(repo, proto):
677 def clonebundles(repo, proto):
673 """Server command for returning info for available bundles to seed clones.
678 """Server command for returning info for available bundles to seed clones.
674
679
675 Clients will parse this response and determine what bundle to fetch.
680 Clients will parse this response and determine what bundle to fetch.
676
681
677 Extensions may wrap this command to filter or dynamically emit data
682 Extensions may wrap this command to filter or dynamically emit data
678 depending on the request. e.g. you could advertise URLs for the closest
683 depending on the request. e.g. you could advertise URLs for the closest
679 data center given the client's IP address.
684 data center given the client's IP address.
680 """
685 """
681 return repo.opener.tryread('clonebundles.manifest')
686 return repo.opener.tryread('clonebundles.manifest')
682
687
683 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
688 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
684 'known', 'getbundle', 'unbundlehash', 'batch']
689 'known', 'getbundle', 'unbundlehash', 'batch']
685
690
686 def _capabilities(repo, proto):
691 def _capabilities(repo, proto):
687 """return a list of capabilities for a repo
692 """return a list of capabilities for a repo
688
693
689 This function exists to allow extensions to easily wrap capabilities
694 This function exists to allow extensions to easily wrap capabilities
690 computation
695 computation
691
696
692 - returns a lists: easy to alter
697 - returns a lists: easy to alter
693 - change done here will be propagated to both `capabilities` and `hello`
698 - change done here will be propagated to both `capabilities` and `hello`
694 command without any other action needed.
699 command without any other action needed.
695 """
700 """
696 # copy to prevent modification of the global list
701 # copy to prevent modification of the global list
697 caps = list(wireprotocaps)
702 caps = list(wireprotocaps)
698 if streamclone.allowservergeneration(repo.ui):
703 if streamclone.allowservergeneration(repo.ui):
699 if repo.ui.configbool('server', 'preferuncompressed', False):
704 if repo.ui.configbool('server', 'preferuncompressed', False):
700 caps.append('stream-preferred')
705 caps.append('stream-preferred')
701 requiredformats = repo.requirements & repo.supportedformats
706 requiredformats = repo.requirements & repo.supportedformats
702 # if our local revlogs are just revlogv1, add 'stream' cap
707 # if our local revlogs are just revlogv1, add 'stream' cap
703 if not requiredformats - set(('revlogv1',)):
708 if not requiredformats - set(('revlogv1',)):
704 caps.append('stream')
709 caps.append('stream')
705 # otherwise, add 'streamreqs' detailing our local revlog format
710 # otherwise, add 'streamreqs' detailing our local revlog format
706 else:
711 else:
707 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
712 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
708 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
713 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
709 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
714 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
710 caps.append('bundle2=' + urlreq.quote(capsblob))
715 caps.append('bundle2=' + urlreq.quote(capsblob))
711 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
716 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
712 caps.append(
717 caps.append(
713 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
718 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
714 if repo.ui.configbool('experimental', 'httppostargs', False):
719 if repo.ui.configbool('experimental', 'httppostargs', False):
715 caps.append('httppostargs')
720 caps.append('httppostargs')
716 return caps
721 return caps
717
722
718 # If you are writing an extension and consider wrapping this function. Wrap
723 # If you are writing an extension and consider wrapping this function. Wrap
719 # `_capabilities` instead.
724 # `_capabilities` instead.
720 @wireprotocommand('capabilities')
725 @wireprotocommand('capabilities')
721 def capabilities(repo, proto):
726 def capabilities(repo, proto):
722 return ' '.join(_capabilities(repo, proto))
727 return ' '.join(_capabilities(repo, proto))
723
728
724 @wireprotocommand('changegroup', 'roots')
729 @wireprotocommand('changegroup', 'roots')
725 def changegroup(repo, proto, roots):
730 def changegroup(repo, proto, roots):
726 nodes = decodelist(roots)
731 nodes = decodelist(roots)
727 cg = changegroupmod.changegroup(repo, nodes, 'serve')
732 cg = changegroupmod.changegroup(repo, nodes, 'serve')
728 return streamres(proto.groupchunks(cg))
733 return streamres(proto.groupchunks(cg))
729
734
730 @wireprotocommand('changegroupsubset', 'bases heads')
735 @wireprotocommand('changegroupsubset', 'bases heads')
731 def changegroupsubset(repo, proto, bases, heads):
736 def changegroupsubset(repo, proto, bases, heads):
732 bases = decodelist(bases)
737 bases = decodelist(bases)
733 heads = decodelist(heads)
738 heads = decodelist(heads)
734 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
739 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
735 return streamres(proto.groupchunks(cg))
740 return streamres(proto.groupchunks(cg))
736
741
737 @wireprotocommand('debugwireargs', 'one two *')
742 @wireprotocommand('debugwireargs', 'one two *')
738 def debugwireargs(repo, proto, one, two, others):
743 def debugwireargs(repo, proto, one, two, others):
739 # only accept optional args from the known set
744 # only accept optional args from the known set
740 opts = options('debugwireargs', ['three', 'four'], others)
745 opts = options('debugwireargs', ['three', 'four'], others)
741 return repo.debugwireargs(one, two, **opts)
746 return repo.debugwireargs(one, two, **opts)
742
747
743 # List of options accepted by getbundle.
748 # List of options accepted by getbundle.
744 #
749 #
745 # Meant to be extended by extensions. It is the extension's responsibility to
750 # Meant to be extended by extensions. It is the extension's responsibility to
746 # ensure such options are properly processed in exchange.getbundle.
751 # ensure such options are properly processed in exchange.getbundle.
747 gboptslist = ['heads', 'common', 'bundlecaps']
752 gboptslist = ['heads', 'common', 'bundlecaps']
748
753
749 @wireprotocommand('getbundle', '*')
754 @wireprotocommand('getbundle', '*')
750 def getbundle(repo, proto, others):
755 def getbundle(repo, proto, others):
751 opts = options('getbundle', gboptsmap.keys(), others)
756 opts = options('getbundle', gboptsmap.keys(), others)
752 for k, v in opts.iteritems():
757 for k, v in opts.iteritems():
753 keytype = gboptsmap[k]
758 keytype = gboptsmap[k]
754 if keytype == 'nodes':
759 if keytype == 'nodes':
755 opts[k] = decodelist(v)
760 opts[k] = decodelist(v)
756 elif keytype == 'csv':
761 elif keytype == 'csv':
757 opts[k] = list(v.split(','))
762 opts[k] = list(v.split(','))
758 elif keytype == 'scsv':
763 elif keytype == 'scsv':
759 opts[k] = set(v.split(','))
764 opts[k] = set(v.split(','))
760 elif keytype == 'boolean':
765 elif keytype == 'boolean':
761 # Client should serialize False as '0', which is a non-empty string
766 # Client should serialize False as '0', which is a non-empty string
762 # so it evaluates as a True bool.
767 # so it evaluates as a True bool.
763 if v == '0':
768 if v == '0':
764 opts[k] = False
769 opts[k] = False
765 else:
770 else:
766 opts[k] = bool(v)
771 opts[k] = bool(v)
767 elif keytype != 'plain':
772 elif keytype != 'plain':
768 raise KeyError('unknown getbundle option type %s'
773 raise KeyError('unknown getbundle option type %s'
769 % keytype)
774 % keytype)
770
775
771 if not bundle1allowed(repo, 'pull'):
776 if not bundle1allowed(repo, 'pull'):
772 if not exchange.bundle2requested(opts.get('bundlecaps')):
777 if not exchange.bundle2requested(opts.get('bundlecaps')):
773 return ooberror(bundle2required)
778 return ooberror(bundle2required)
774
779
775 cg = exchange.getbundle(repo, 'serve', **opts)
780 cg = exchange.getbundle(repo, 'serve', **opts)
776 return streamres(proto.groupchunks(cg))
781 return streamres(proto.groupchunks(cg))
777
782
778 @wireprotocommand('heads')
783 @wireprotocommand('heads')
779 def heads(repo, proto):
784 def heads(repo, proto):
780 h = repo.heads()
785 h = repo.heads()
781 return encodelist(h) + "\n"
786 return encodelist(h) + "\n"
782
787
783 @wireprotocommand('hello')
788 @wireprotocommand('hello')
784 def hello(repo, proto):
789 def hello(repo, proto):
785 '''the hello command returns a set of lines describing various
790 '''the hello command returns a set of lines describing various
786 interesting things about the server, in an RFC822-like format.
791 interesting things about the server, in an RFC822-like format.
787 Currently the only one defined is "capabilities", which
792 Currently the only one defined is "capabilities", which
788 consists of a line in the form:
793 consists of a line in the form:
789
794
790 capabilities: space separated list of tokens
795 capabilities: space separated list of tokens
791 '''
796 '''
792 return "capabilities: %s\n" % (capabilities(repo, proto))
797 return "capabilities: %s\n" % (capabilities(repo, proto))
793
798
794 @wireprotocommand('listkeys', 'namespace')
799 @wireprotocommand('listkeys', 'namespace')
795 def listkeys(repo, proto, namespace):
800 def listkeys(repo, proto, namespace):
796 d = repo.listkeys(encoding.tolocal(namespace)).items()
801 d = repo.listkeys(encoding.tolocal(namespace)).items()
797 return pushkeymod.encodekeys(d)
802 return pushkeymod.encodekeys(d)
798
803
799 @wireprotocommand('lookup', 'key')
804 @wireprotocommand('lookup', 'key')
800 def lookup(repo, proto, key):
805 def lookup(repo, proto, key):
801 try:
806 try:
802 k = encoding.tolocal(key)
807 k = encoding.tolocal(key)
803 c = repo[k]
808 c = repo[k]
804 r = c.hex()
809 r = c.hex()
805 success = 1
810 success = 1
806 except Exception as inst:
811 except Exception as inst:
807 r = str(inst)
812 r = str(inst)
808 success = 0
813 success = 0
809 return "%s %s\n" % (success, r)
814 return "%s %s\n" % (success, r)
810
815
811 @wireprotocommand('known', 'nodes *')
816 @wireprotocommand('known', 'nodes *')
812 def known(repo, proto, nodes, others):
817 def known(repo, proto, nodes, others):
813 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
818 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
814
819
815 @wireprotocommand('pushkey', 'namespace key old new')
820 @wireprotocommand('pushkey', 'namespace key old new')
816 def pushkey(repo, proto, namespace, key, old, new):
821 def pushkey(repo, proto, namespace, key, old, new):
817 # compatibility with pre-1.8 clients which were accidentally
822 # compatibility with pre-1.8 clients which were accidentally
818 # sending raw binary nodes rather than utf-8-encoded hex
823 # sending raw binary nodes rather than utf-8-encoded hex
819 if len(new) == 20 and new.encode('string-escape') != new:
824 if len(new) == 20 and new.encode('string-escape') != new:
820 # looks like it could be a binary node
825 # looks like it could be a binary node
821 try:
826 try:
822 new.decode('utf-8')
827 new.decode('utf-8')
823 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
828 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
824 except UnicodeDecodeError:
829 except UnicodeDecodeError:
825 pass # binary, leave unmodified
830 pass # binary, leave unmodified
826 else:
831 else:
827 new = encoding.tolocal(new) # normal path
832 new = encoding.tolocal(new) # normal path
828
833
829 if util.safehasattr(proto, 'restore'):
834 if util.safehasattr(proto, 'restore'):
830
835
831 proto.redirect()
836 proto.redirect()
832
837
833 try:
838 try:
834 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
839 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
835 encoding.tolocal(old), new) or False
840 encoding.tolocal(old), new) or False
836 except error.Abort:
841 except error.Abort:
837 r = False
842 r = False
838
843
839 output = proto.restore()
844 output = proto.restore()
840
845
841 return '%s\n%s' % (int(r), output)
846 return '%s\n%s' % (int(r), output)
842
847
843 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
848 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
844 encoding.tolocal(old), new)
849 encoding.tolocal(old), new)
845 return '%s\n' % int(r)
850 return '%s\n' % int(r)
846
851
847 @wireprotocommand('stream_out')
852 @wireprotocommand('stream_out')
848 def stream(repo, proto):
853 def stream(repo, proto):
849 '''If the server supports streaming clone, it advertises the "stream"
854 '''If the server supports streaming clone, it advertises the "stream"
850 capability with a value representing the version and flags of the repo
855 capability with a value representing the version and flags of the repo
851 it is serving. Client checks to see if it understands the format.
856 it is serving. Client checks to see if it understands the format.
852 '''
857 '''
853 if not streamclone.allowservergeneration(repo.ui):
858 if not streamclone.allowservergeneration(repo.ui):
854 return '1\n'
859 return '1\n'
855
860
856 def getstream(it):
861 def getstream(it):
857 yield '0\n'
862 yield '0\n'
858 for chunk in it:
863 for chunk in it:
859 yield chunk
864 yield chunk
860
865
861 try:
866 try:
862 # LockError may be raised before the first result is yielded. Don't
867 # LockError may be raised before the first result is yielded. Don't
863 # emit output until we're sure we got the lock successfully.
868 # emit output until we're sure we got the lock successfully.
864 it = streamclone.generatev1wireproto(repo)
869 it = streamclone.generatev1wireproto(repo)
865 return streamres(getstream(it))
870 return streamres(getstream(it))
866 except error.LockError:
871 except error.LockError:
867 return '2\n'
872 return '2\n'
868
873
869 @wireprotocommand('unbundle', 'heads')
874 @wireprotocommand('unbundle', 'heads')
870 def unbundle(repo, proto, heads):
875 def unbundle(repo, proto, heads):
871 their_heads = decodelist(heads)
876 their_heads = decodelist(heads)
872
877
873 try:
878 try:
874 proto.redirect()
879 proto.redirect()
875
880
876 exchange.check_heads(repo, their_heads, 'preparing changes')
881 exchange.check_heads(repo, their_heads, 'preparing changes')
877
882
878 # write bundle data to temporary file because it can be big
883 # write bundle data to temporary file because it can be big
879 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
884 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
880 fp = os.fdopen(fd, 'wb+')
885 fp = os.fdopen(fd, 'wb+')
881 r = 0
886 r = 0
882 try:
887 try:
883 proto.getfile(fp)
888 proto.getfile(fp)
884 fp.seek(0)
889 fp.seek(0)
885 gen = exchange.readbundle(repo.ui, fp, None)
890 gen = exchange.readbundle(repo.ui, fp, None)
886 if (isinstance(gen, changegroupmod.cg1unpacker)
891 if (isinstance(gen, changegroupmod.cg1unpacker)
887 and not bundle1allowed(repo, 'push')):
892 and not bundle1allowed(repo, 'push')):
888 return ooberror(bundle2required)
893 return ooberror(bundle2required)
889
894
890 r = exchange.unbundle(repo, gen, their_heads, 'serve',
895 r = exchange.unbundle(repo, gen, their_heads, 'serve',
891 proto._client())
896 proto._client())
892 if util.safehasattr(r, 'addpart'):
897 if util.safehasattr(r, 'addpart'):
893 # The return looks streamable, we are in the bundle2 case and
898 # The return looks streamable, we are in the bundle2 case and
894 # should return a stream.
899 # should return a stream.
895 return streamres(r.getchunks())
900 return streamres(r.getchunks())
896 return pushres(r)
901 return pushres(r)
897
902
898 finally:
903 finally:
899 fp.close()
904 fp.close()
900 os.unlink(tempname)
905 os.unlink(tempname)
901
906
902 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
907 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
903 # handle non-bundle2 case first
908 # handle non-bundle2 case first
904 if not getattr(exc, 'duringunbundle2', False):
909 if not getattr(exc, 'duringunbundle2', False):
905 try:
910 try:
906 raise
911 raise
907 except error.Abort:
912 except error.Abort:
908 # The old code we moved used sys.stderr directly.
913 # The old code we moved used sys.stderr directly.
909 # We did not change it to minimise code change.
914 # We did not change it to minimise code change.
910 # This need to be moved to something proper.
915 # This need to be moved to something proper.
911 # Feel free to do it.
916 # Feel free to do it.
912 sys.stderr.write("abort: %s\n" % exc)
917 sys.stderr.write("abort: %s\n" % exc)
913 return pushres(0)
918 return pushres(0)
914 except error.PushRaced:
919 except error.PushRaced:
915 return pusherr(str(exc))
920 return pusherr(str(exc))
916
921
917 bundler = bundle2.bundle20(repo.ui)
922 bundler = bundle2.bundle20(repo.ui)
918 for out in getattr(exc, '_bundle2salvagedoutput', ()):
923 for out in getattr(exc, '_bundle2salvagedoutput', ()):
919 bundler.addpart(out)
924 bundler.addpart(out)
920 try:
925 try:
921 try:
926 try:
922 raise
927 raise
923 except error.PushkeyFailed as exc:
928 except error.PushkeyFailed as exc:
924 # check client caps
929 # check client caps
925 remotecaps = getattr(exc, '_replycaps', None)
930 remotecaps = getattr(exc, '_replycaps', None)
926 if (remotecaps is not None
931 if (remotecaps is not None
927 and 'pushkey' not in remotecaps.get('error', ())):
932 and 'pushkey' not in remotecaps.get('error', ())):
928 # no support remote side, fallback to Abort handler.
933 # no support remote side, fallback to Abort handler.
929 raise
934 raise
930 part = bundler.newpart('error:pushkey')
935 part = bundler.newpart('error:pushkey')
931 part.addparam('in-reply-to', exc.partid)
936 part.addparam('in-reply-to', exc.partid)
932 if exc.namespace is not None:
937 if exc.namespace is not None:
933 part.addparam('namespace', exc.namespace, mandatory=False)
938 part.addparam('namespace', exc.namespace, mandatory=False)
934 if exc.key is not None:
939 if exc.key is not None:
935 part.addparam('key', exc.key, mandatory=False)
940 part.addparam('key', exc.key, mandatory=False)
936 if exc.new is not None:
941 if exc.new is not None:
937 part.addparam('new', exc.new, mandatory=False)
942 part.addparam('new', exc.new, mandatory=False)
938 if exc.old is not None:
943 if exc.old is not None:
939 part.addparam('old', exc.old, mandatory=False)
944 part.addparam('old', exc.old, mandatory=False)
940 if exc.ret is not None:
945 if exc.ret is not None:
941 part.addparam('ret', exc.ret, mandatory=False)
946 part.addparam('ret', exc.ret, mandatory=False)
942 except error.BundleValueError as exc:
947 except error.BundleValueError as exc:
943 errpart = bundler.newpart('error:unsupportedcontent')
948 errpart = bundler.newpart('error:unsupportedcontent')
944 if exc.parttype is not None:
949 if exc.parttype is not None:
945 errpart.addparam('parttype', exc.parttype)
950 errpart.addparam('parttype', exc.parttype)
946 if exc.params:
951 if exc.params:
947 errpart.addparam('params', '\0'.join(exc.params))
952 errpart.addparam('params', '\0'.join(exc.params))
948 except error.Abort as exc:
953 except error.Abort as exc:
949 manargs = [('message', str(exc))]
954 manargs = [('message', str(exc))]
950 advargs = []
955 advargs = []
951 if exc.hint is not None:
956 if exc.hint is not None:
952 advargs.append(('hint', exc.hint))
957 advargs.append(('hint', exc.hint))
953 bundler.addpart(bundle2.bundlepart('error:abort',
958 bundler.addpart(bundle2.bundlepart('error:abort',
954 manargs, advargs))
959 manargs, advargs))
955 except error.PushRaced as exc:
960 except error.PushRaced as exc:
956 bundler.newpart('error:pushraced', [('message', str(exc))])
961 bundler.newpart('error:pushraced', [('message', str(exc))])
957 return streamres(bundler.getchunks())
962 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now