##// END OF EJS Templates
wireproto: extract repo filtering to standalone function...
Gregory Szorc -
r29590:84c1a594 default
parent child Browse files
Show More
@@ -1,939 +1,948 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
11 import itertools
12 import os
12 import os
13 import sys
13 import sys
14 import tempfile
14 import tempfile
15
15
16 from .i18n import _
16 from .i18n import _
17 from .node import (
17 from .node import (
18 bin,
18 bin,
19 hex,
19 hex,
20 )
20 )
21
21
22 from . import (
22 from . import (
23 bundle2,
23 bundle2,
24 changegroup as changegroupmod,
24 changegroup as changegroupmod,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 streamclone,
30 streamclone,
31 util,
31 util,
32 )
32 )
33
33
34 urlerr = util.urlerr
34 urlerr = util.urlerr
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37 bundle2required = _(
37 bundle2required = _(
38 'incompatible Mercurial client; bundle2 required\n'
38 'incompatible Mercurial client; bundle2 required\n'
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40
40
41 class abstractserverproto(object):
41 class abstractserverproto(object):
42 """abstract class that summarizes the protocol API
42 """abstract class that summarizes the protocol API
43
43
44 Used as reference and documentation.
44 Used as reference and documentation.
45 """
45 """
46
46
47 def getargs(self, args):
47 def getargs(self, args):
48 """return the value for arguments in <args>
48 """return the value for arguments in <args>
49
49
50 returns a list of values (same order as <args>)"""
50 returns a list of values (same order as <args>)"""
51 raise NotImplementedError()
51 raise NotImplementedError()
52
52
53 def getfile(self, fp):
53 def getfile(self, fp):
54 """write the whole content of a file into a file like object
54 """write the whole content of a file into a file like object
55
55
56 The file is in the form::
56 The file is in the form::
57
57
58 (<chunk-size>\n<chunk>)+0\n
58 (<chunk-size>\n<chunk>)+0\n
59
59
60 chunk size is the ascii version of the int.
60 chunk size is the ascii version of the int.
61 """
61 """
62 raise NotImplementedError()
62 raise NotImplementedError()
63
63
64 def redirect(self):
64 def redirect(self):
65 """may setup interception for stdout and stderr
65 """may setup interception for stdout and stderr
66
66
67 See also the `restore` method."""
67 See also the `restore` method."""
68 raise NotImplementedError()
68 raise NotImplementedError()
69
69
70 # If the `redirect` function does install interception, the `restore`
70 # If the `redirect` function does install interception, the `restore`
71 # function MUST be defined. If interception is not used, this function
71 # function MUST be defined. If interception is not used, this function
72 # MUST NOT be defined.
72 # MUST NOT be defined.
73 #
73 #
74 # left commented here on purpose
74 # left commented here on purpose
75 #
75 #
76 #def restore(self):
76 #def restore(self):
77 # """reinstall previous stdout and stderr and return intercepted stdout
77 # """reinstall previous stdout and stderr and return intercepted stdout
78 # """
78 # """
79 # raise NotImplementedError()
79 # raise NotImplementedError()
80
80
81 def groupchunks(self, cg):
81 def groupchunks(self, cg):
82 """return 4096 chunks from a changegroup object
82 """return 4096 chunks from a changegroup object
83
83
84 Some protocols may have compressed the contents."""
84 Some protocols may have compressed the contents."""
85 raise NotImplementedError()
85 raise NotImplementedError()
86
86
87 class remotebatch(peer.batcher):
87 class remotebatch(peer.batcher):
88 '''batches the queued calls; uses as few roundtrips as possible'''
88 '''batches the queued calls; uses as few roundtrips as possible'''
89 def __init__(self, remote):
89 def __init__(self, remote):
90 '''remote must support _submitbatch(encbatch) and
90 '''remote must support _submitbatch(encbatch) and
91 _submitone(op, encargs)'''
91 _submitone(op, encargs)'''
92 peer.batcher.__init__(self)
92 peer.batcher.__init__(self)
93 self.remote = remote
93 self.remote = remote
94 def submit(self):
94 def submit(self):
95 req, rsp = [], []
95 req, rsp = [], []
96 for name, args, opts, resref in self.calls:
96 for name, args, opts, resref in self.calls:
97 mtd = getattr(self.remote, name)
97 mtd = getattr(self.remote, name)
98 batchablefn = getattr(mtd, 'batchable', None)
98 batchablefn = getattr(mtd, 'batchable', None)
99 if batchablefn is not None:
99 if batchablefn is not None:
100 batchable = batchablefn(mtd.im_self, *args, **opts)
100 batchable = batchablefn(mtd.im_self, *args, **opts)
101 encargsorres, encresref = next(batchable)
101 encargsorres, encresref = next(batchable)
102 if encresref:
102 if encresref:
103 req.append((name, encargsorres,))
103 req.append((name, encargsorres,))
104 rsp.append((batchable, encresref, resref,))
104 rsp.append((batchable, encresref, resref,))
105 else:
105 else:
106 resref.set(encargsorres)
106 resref.set(encargsorres)
107 else:
107 else:
108 if req:
108 if req:
109 self._submitreq(req, rsp)
109 self._submitreq(req, rsp)
110 req, rsp = [], []
110 req, rsp = [], []
111 resref.set(mtd(*args, **opts))
111 resref.set(mtd(*args, **opts))
112 if req:
112 if req:
113 self._submitreq(req, rsp)
113 self._submitreq(req, rsp)
114 def _submitreq(self, req, rsp):
114 def _submitreq(self, req, rsp):
115 encresults = self.remote._submitbatch(req)
115 encresults = self.remote._submitbatch(req)
116 for encres, r in zip(encresults, rsp):
116 for encres, r in zip(encresults, rsp):
117 batchable, encresref, resref = r
117 batchable, encresref, resref = r
118 encresref.set(encres)
118 encresref.set(encres)
119 resref.set(next(batchable))
119 resref.set(next(batchable))
120
120
121 class remoteiterbatcher(peer.iterbatcher):
121 class remoteiterbatcher(peer.iterbatcher):
122 def __init__(self, remote):
122 def __init__(self, remote):
123 super(remoteiterbatcher, self).__init__()
123 super(remoteiterbatcher, self).__init__()
124 self._remote = remote
124 self._remote = remote
125
125
126 def __getattr__(self, name):
126 def __getattr__(self, name):
127 if not getattr(self._remote, name, False):
127 if not getattr(self._remote, name, False):
128 raise AttributeError(
128 raise AttributeError(
129 'Attempted to iterbatch non-batchable call to %r' % name)
129 'Attempted to iterbatch non-batchable call to %r' % name)
130 return super(remoteiterbatcher, self).__getattr__(name)
130 return super(remoteiterbatcher, self).__getattr__(name)
131
131
132 def submit(self):
132 def submit(self):
133 """Break the batch request into many patch calls and pipeline them.
133 """Break the batch request into many patch calls and pipeline them.
134
134
135 This is mostly valuable over http where request sizes can be
135 This is mostly valuable over http where request sizes can be
136 limited, but can be used in other places as well.
136 limited, but can be used in other places as well.
137 """
137 """
138 req, rsp = [], []
138 req, rsp = [], []
139 for name, args, opts, resref in self.calls:
139 for name, args, opts, resref in self.calls:
140 mtd = getattr(self._remote, name)
140 mtd = getattr(self._remote, name)
141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
142 encargsorres, encresref = next(batchable)
142 encargsorres, encresref = next(batchable)
143 assert encresref
143 assert encresref
144 req.append((name, encargsorres))
144 req.append((name, encargsorres))
145 rsp.append((batchable, encresref))
145 rsp.append((batchable, encresref))
146 if req:
146 if req:
147 self._resultiter = self._remote._submitbatch(req)
147 self._resultiter = self._remote._submitbatch(req)
148 self._rsp = rsp
148 self._rsp = rsp
149
149
150 def results(self):
150 def results(self):
151 for (batchable, encresref), encres in itertools.izip(
151 for (batchable, encresref), encres in itertools.izip(
152 self._rsp, self._resultiter):
152 self._rsp, self._resultiter):
153 encresref.set(encres)
153 encresref.set(encres)
154 yield next(batchable)
154 yield next(batchable)
155
155
156 # Forward a couple of names from peer to make wireproto interactions
156 # Forward a couple of names from peer to make wireproto interactions
157 # slightly more sensible.
157 # slightly more sensible.
158 batchable = peer.batchable
158 batchable = peer.batchable
159 future = peer.future
159 future = peer.future
160
160
161 # list of nodes encoding / decoding
161 # list of nodes encoding / decoding
162
162
163 def decodelist(l, sep=' '):
163 def decodelist(l, sep=' '):
164 if l:
164 if l:
165 return map(bin, l.split(sep))
165 return map(bin, l.split(sep))
166 return []
166 return []
167
167
168 def encodelist(l, sep=' '):
168 def encodelist(l, sep=' '):
169 try:
169 try:
170 return sep.join(map(hex, l))
170 return sep.join(map(hex, l))
171 except TypeError:
171 except TypeError:
172 raise
172 raise
173
173
174 # batched call argument encoding
174 # batched call argument encoding
175
175
176 def escapearg(plain):
176 def escapearg(plain):
177 return (plain
177 return (plain
178 .replace(':', ':c')
178 .replace(':', ':c')
179 .replace(',', ':o')
179 .replace(',', ':o')
180 .replace(';', ':s')
180 .replace(';', ':s')
181 .replace('=', ':e'))
181 .replace('=', ':e'))
182
182
183 def unescapearg(escaped):
183 def unescapearg(escaped):
184 return (escaped
184 return (escaped
185 .replace(':e', '=')
185 .replace(':e', '=')
186 .replace(':s', ';')
186 .replace(':s', ';')
187 .replace(':o', ',')
187 .replace(':o', ',')
188 .replace(':c', ':'))
188 .replace(':c', ':'))
189
189
190 # mapping of options accepted by getbundle and their types
190 # mapping of options accepted by getbundle and their types
191 #
191 #
192 # Meant to be extended by extensions. It is extensions responsibility to ensure
192 # Meant to be extended by extensions. It is extensions responsibility to ensure
193 # such options are properly processed in exchange.getbundle.
193 # such options are properly processed in exchange.getbundle.
194 #
194 #
195 # supported types are:
195 # supported types are:
196 #
196 #
197 # :nodes: list of binary nodes
197 # :nodes: list of binary nodes
198 # :csv: list of comma-separated values
198 # :csv: list of comma-separated values
199 # :scsv: list of comma-separated values return as set
199 # :scsv: list of comma-separated values return as set
200 # :plain: string with no transformation needed.
200 # :plain: string with no transformation needed.
201 gboptsmap = {'heads': 'nodes',
201 gboptsmap = {'heads': 'nodes',
202 'common': 'nodes',
202 'common': 'nodes',
203 'obsmarkers': 'boolean',
203 'obsmarkers': 'boolean',
204 'bundlecaps': 'scsv',
204 'bundlecaps': 'scsv',
205 'listkeys': 'csv',
205 'listkeys': 'csv',
206 'cg': 'boolean',
206 'cg': 'boolean',
207 'cbattempted': 'boolean'}
207 'cbattempted': 'boolean'}
208
208
209 # client side
209 # client side
210
210
211 class wirepeer(peer.peerrepository):
211 class wirepeer(peer.peerrepository):
212 """Client-side interface for communicating with a peer repository.
212 """Client-side interface for communicating with a peer repository.
213
213
214 Methods commonly call wire protocol commands of the same name.
214 Methods commonly call wire protocol commands of the same name.
215
215
216 See also httppeer.py and sshpeer.py for protocol-specific
216 See also httppeer.py and sshpeer.py for protocol-specific
217 implementations of this interface.
217 implementations of this interface.
218 """
218 """
219 def batch(self):
219 def batch(self):
220 if self.capable('batch'):
220 if self.capable('batch'):
221 return remotebatch(self)
221 return remotebatch(self)
222 else:
222 else:
223 return peer.localbatch(self)
223 return peer.localbatch(self)
224 def _submitbatch(self, req):
224 def _submitbatch(self, req):
225 """run batch request <req> on the server
225 """run batch request <req> on the server
226
226
227 Returns an iterator of the raw responses from the server.
227 Returns an iterator of the raw responses from the server.
228 """
228 """
229 cmds = []
229 cmds = []
230 for op, argsdict in req:
230 for op, argsdict in req:
231 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
231 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
232 for k, v in argsdict.iteritems())
232 for k, v in argsdict.iteritems())
233 cmds.append('%s %s' % (op, args))
233 cmds.append('%s %s' % (op, args))
234 rsp = self._callstream("batch", cmds=';'.join(cmds))
234 rsp = self._callstream("batch", cmds=';'.join(cmds))
235 chunk = rsp.read(1024)
235 chunk = rsp.read(1024)
236 work = [chunk]
236 work = [chunk]
237 while chunk:
237 while chunk:
238 while ';' not in chunk and chunk:
238 while ';' not in chunk and chunk:
239 chunk = rsp.read(1024)
239 chunk = rsp.read(1024)
240 work.append(chunk)
240 work.append(chunk)
241 merged = ''.join(work)
241 merged = ''.join(work)
242 while ';' in merged:
242 while ';' in merged:
243 one, merged = merged.split(';', 1)
243 one, merged = merged.split(';', 1)
244 yield unescapearg(one)
244 yield unescapearg(one)
245 chunk = rsp.read(1024)
245 chunk = rsp.read(1024)
246 work = [merged, chunk]
246 work = [merged, chunk]
247 yield unescapearg(''.join(work))
247 yield unescapearg(''.join(work))
248
248
249 def _submitone(self, op, args):
249 def _submitone(self, op, args):
250 return self._call(op, **args)
250 return self._call(op, **args)
251
251
252 def iterbatch(self):
252 def iterbatch(self):
253 return remoteiterbatcher(self)
253 return remoteiterbatcher(self)
254
254
255 @batchable
255 @batchable
256 def lookup(self, key):
256 def lookup(self, key):
257 self.requirecap('lookup', _('look up remote revision'))
257 self.requirecap('lookup', _('look up remote revision'))
258 f = future()
258 f = future()
259 yield {'key': encoding.fromlocal(key)}, f
259 yield {'key': encoding.fromlocal(key)}, f
260 d = f.value
260 d = f.value
261 success, data = d[:-1].split(" ", 1)
261 success, data = d[:-1].split(" ", 1)
262 if int(success):
262 if int(success):
263 yield bin(data)
263 yield bin(data)
264 self._abort(error.RepoError(data))
264 self._abort(error.RepoError(data))
265
265
266 @batchable
266 @batchable
267 def heads(self):
267 def heads(self):
268 f = future()
268 f = future()
269 yield {}, f
269 yield {}, f
270 d = f.value
270 d = f.value
271 try:
271 try:
272 yield decodelist(d[:-1])
272 yield decodelist(d[:-1])
273 except ValueError:
273 except ValueError:
274 self._abort(error.ResponseError(_("unexpected response:"), d))
274 self._abort(error.ResponseError(_("unexpected response:"), d))
275
275
276 @batchable
276 @batchable
277 def known(self, nodes):
277 def known(self, nodes):
278 f = future()
278 f = future()
279 yield {'nodes': encodelist(nodes)}, f
279 yield {'nodes': encodelist(nodes)}, f
280 d = f.value
280 d = f.value
281 try:
281 try:
282 yield [bool(int(b)) for b in d]
282 yield [bool(int(b)) for b in d]
283 except ValueError:
283 except ValueError:
284 self._abort(error.ResponseError(_("unexpected response:"), d))
284 self._abort(error.ResponseError(_("unexpected response:"), d))
285
285
286 @batchable
286 @batchable
287 def branchmap(self):
287 def branchmap(self):
288 f = future()
288 f = future()
289 yield {}, f
289 yield {}, f
290 d = f.value
290 d = f.value
291 try:
291 try:
292 branchmap = {}
292 branchmap = {}
293 for branchpart in d.splitlines():
293 for branchpart in d.splitlines():
294 branchname, branchheads = branchpart.split(' ', 1)
294 branchname, branchheads = branchpart.split(' ', 1)
295 branchname = encoding.tolocal(urlreq.unquote(branchname))
295 branchname = encoding.tolocal(urlreq.unquote(branchname))
296 branchheads = decodelist(branchheads)
296 branchheads = decodelist(branchheads)
297 branchmap[branchname] = branchheads
297 branchmap[branchname] = branchheads
298 yield branchmap
298 yield branchmap
299 except TypeError:
299 except TypeError:
300 self._abort(error.ResponseError(_("unexpected response:"), d))
300 self._abort(error.ResponseError(_("unexpected response:"), d))
301
301
302 def branches(self, nodes):
302 def branches(self, nodes):
303 n = encodelist(nodes)
303 n = encodelist(nodes)
304 d = self._call("branches", nodes=n)
304 d = self._call("branches", nodes=n)
305 try:
305 try:
306 br = [tuple(decodelist(b)) for b in d.splitlines()]
306 br = [tuple(decodelist(b)) for b in d.splitlines()]
307 return br
307 return br
308 except ValueError:
308 except ValueError:
309 self._abort(error.ResponseError(_("unexpected response:"), d))
309 self._abort(error.ResponseError(_("unexpected response:"), d))
310
310
311 def between(self, pairs):
311 def between(self, pairs):
312 batch = 8 # avoid giant requests
312 batch = 8 # avoid giant requests
313 r = []
313 r = []
314 for i in xrange(0, len(pairs), batch):
314 for i in xrange(0, len(pairs), batch):
315 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
315 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
316 d = self._call("between", pairs=n)
316 d = self._call("between", pairs=n)
317 try:
317 try:
318 r.extend(l and decodelist(l) or [] for l in d.splitlines())
318 r.extend(l and decodelist(l) or [] for l in d.splitlines())
319 except ValueError:
319 except ValueError:
320 self._abort(error.ResponseError(_("unexpected response:"), d))
320 self._abort(error.ResponseError(_("unexpected response:"), d))
321 return r
321 return r
322
322
323 @batchable
323 @batchable
324 def pushkey(self, namespace, key, old, new):
324 def pushkey(self, namespace, key, old, new):
325 if not self.capable('pushkey'):
325 if not self.capable('pushkey'):
326 yield False, None
326 yield False, None
327 f = future()
327 f = future()
328 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
328 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
329 yield {'namespace': encoding.fromlocal(namespace),
329 yield {'namespace': encoding.fromlocal(namespace),
330 'key': encoding.fromlocal(key),
330 'key': encoding.fromlocal(key),
331 'old': encoding.fromlocal(old),
331 'old': encoding.fromlocal(old),
332 'new': encoding.fromlocal(new)}, f
332 'new': encoding.fromlocal(new)}, f
333 d = f.value
333 d = f.value
334 d, output = d.split('\n', 1)
334 d, output = d.split('\n', 1)
335 try:
335 try:
336 d = bool(int(d))
336 d = bool(int(d))
337 except ValueError:
337 except ValueError:
338 raise error.ResponseError(
338 raise error.ResponseError(
339 _('push failed (unexpected response):'), d)
339 _('push failed (unexpected response):'), d)
340 for l in output.splitlines(True):
340 for l in output.splitlines(True):
341 self.ui.status(_('remote: '), l)
341 self.ui.status(_('remote: '), l)
342 yield d
342 yield d
343
343
344 @batchable
344 @batchable
345 def listkeys(self, namespace):
345 def listkeys(self, namespace):
346 if not self.capable('pushkey'):
346 if not self.capable('pushkey'):
347 yield {}, None
347 yield {}, None
348 f = future()
348 f = future()
349 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
349 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
350 yield {'namespace': encoding.fromlocal(namespace)}, f
350 yield {'namespace': encoding.fromlocal(namespace)}, f
351 d = f.value
351 d = f.value
352 self.ui.debug('received listkey for "%s": %i bytes\n'
352 self.ui.debug('received listkey for "%s": %i bytes\n'
353 % (namespace, len(d)))
353 % (namespace, len(d)))
354 yield pushkeymod.decodekeys(d)
354 yield pushkeymod.decodekeys(d)
355
355
356 def stream_out(self):
356 def stream_out(self):
357 return self._callstream('stream_out')
357 return self._callstream('stream_out')
358
358
359 def changegroup(self, nodes, kind):
359 def changegroup(self, nodes, kind):
360 n = encodelist(nodes)
360 n = encodelist(nodes)
361 f = self._callcompressable("changegroup", roots=n)
361 f = self._callcompressable("changegroup", roots=n)
362 return changegroupmod.cg1unpacker(f, 'UN')
362 return changegroupmod.cg1unpacker(f, 'UN')
363
363
364 def changegroupsubset(self, bases, heads, kind):
364 def changegroupsubset(self, bases, heads, kind):
365 self.requirecap('changegroupsubset', _('look up remote changes'))
365 self.requirecap('changegroupsubset', _('look up remote changes'))
366 bases = encodelist(bases)
366 bases = encodelist(bases)
367 heads = encodelist(heads)
367 heads = encodelist(heads)
368 f = self._callcompressable("changegroupsubset",
368 f = self._callcompressable("changegroupsubset",
369 bases=bases, heads=heads)
369 bases=bases, heads=heads)
370 return changegroupmod.cg1unpacker(f, 'UN')
370 return changegroupmod.cg1unpacker(f, 'UN')
371
371
372 def getbundle(self, source, **kwargs):
372 def getbundle(self, source, **kwargs):
373 self.requirecap('getbundle', _('look up remote changes'))
373 self.requirecap('getbundle', _('look up remote changes'))
374 opts = {}
374 opts = {}
375 bundlecaps = kwargs.get('bundlecaps')
375 bundlecaps = kwargs.get('bundlecaps')
376 if bundlecaps is not None:
376 if bundlecaps is not None:
377 kwargs['bundlecaps'] = sorted(bundlecaps)
377 kwargs['bundlecaps'] = sorted(bundlecaps)
378 else:
378 else:
379 bundlecaps = () # kwargs could have it to None
379 bundlecaps = () # kwargs could have it to None
380 for key, value in kwargs.iteritems():
380 for key, value in kwargs.iteritems():
381 if value is None:
381 if value is None:
382 continue
382 continue
383 keytype = gboptsmap.get(key)
383 keytype = gboptsmap.get(key)
384 if keytype is None:
384 if keytype is None:
385 assert False, 'unexpected'
385 assert False, 'unexpected'
386 elif keytype == 'nodes':
386 elif keytype == 'nodes':
387 value = encodelist(value)
387 value = encodelist(value)
388 elif keytype in ('csv', 'scsv'):
388 elif keytype in ('csv', 'scsv'):
389 value = ','.join(value)
389 value = ','.join(value)
390 elif keytype == 'boolean':
390 elif keytype == 'boolean':
391 value = '%i' % bool(value)
391 value = '%i' % bool(value)
392 elif keytype != 'plain':
392 elif keytype != 'plain':
393 raise KeyError('unknown getbundle option type %s'
393 raise KeyError('unknown getbundle option type %s'
394 % keytype)
394 % keytype)
395 opts[key] = value
395 opts[key] = value
396 f = self._callcompressable("getbundle", **opts)
396 f = self._callcompressable("getbundle", **opts)
397 if any((cap.startswith('HG2') for cap in bundlecaps)):
397 if any((cap.startswith('HG2') for cap in bundlecaps)):
398 return bundle2.getunbundler(self.ui, f)
398 return bundle2.getunbundler(self.ui, f)
399 else:
399 else:
400 return changegroupmod.cg1unpacker(f, 'UN')
400 return changegroupmod.cg1unpacker(f, 'UN')
401
401
402 def unbundle(self, cg, heads, source):
402 def unbundle(self, cg, heads, source):
403 '''Send cg (a readable file-like object representing the
403 '''Send cg (a readable file-like object representing the
404 changegroup to push, typically a chunkbuffer object) to the
404 changegroup to push, typically a chunkbuffer object) to the
405 remote server as a bundle.
405 remote server as a bundle.
406
406
407 When pushing a bundle10 stream, return an integer indicating the
407 When pushing a bundle10 stream, return an integer indicating the
408 result of the push (see localrepository.addchangegroup()).
408 result of the push (see localrepository.addchangegroup()).
409
409
410 When pushing a bundle20 stream, return a bundle20 stream.'''
410 When pushing a bundle20 stream, return a bundle20 stream.'''
411
411
412 if heads != ['force'] and self.capable('unbundlehash'):
412 if heads != ['force'] and self.capable('unbundlehash'):
413 heads = encodelist(['hashed',
413 heads = encodelist(['hashed',
414 hashlib.sha1(''.join(sorted(heads))).digest()])
414 hashlib.sha1(''.join(sorted(heads))).digest()])
415 else:
415 else:
416 heads = encodelist(heads)
416 heads = encodelist(heads)
417
417
418 if util.safehasattr(cg, 'deltaheader'):
418 if util.safehasattr(cg, 'deltaheader'):
419 # this a bundle10, do the old style call sequence
419 # this a bundle10, do the old style call sequence
420 ret, output = self._callpush("unbundle", cg, heads=heads)
420 ret, output = self._callpush("unbundle", cg, heads=heads)
421 if ret == "":
421 if ret == "":
422 raise error.ResponseError(
422 raise error.ResponseError(
423 _('push failed:'), output)
423 _('push failed:'), output)
424 try:
424 try:
425 ret = int(ret)
425 ret = int(ret)
426 except ValueError:
426 except ValueError:
427 raise error.ResponseError(
427 raise error.ResponseError(
428 _('push failed (unexpected response):'), ret)
428 _('push failed (unexpected response):'), ret)
429
429
430 for l in output.splitlines(True):
430 for l in output.splitlines(True):
431 self.ui.status(_('remote: '), l)
431 self.ui.status(_('remote: '), l)
432 else:
432 else:
433 # bundle2 push. Send a stream, fetch a stream.
433 # bundle2 push. Send a stream, fetch a stream.
434 stream = self._calltwowaystream('unbundle', cg, heads=heads)
434 stream = self._calltwowaystream('unbundle', cg, heads=heads)
435 ret = bundle2.getunbundler(self.ui, stream)
435 ret = bundle2.getunbundler(self.ui, stream)
436 return ret
436 return ret
437
437
438 def debugwireargs(self, one, two, three=None, four=None, five=None):
438 def debugwireargs(self, one, two, three=None, four=None, five=None):
439 # don't pass optional arguments left at their default value
439 # don't pass optional arguments left at their default value
440 opts = {}
440 opts = {}
441 if three is not None:
441 if three is not None:
442 opts['three'] = three
442 opts['three'] = three
443 if four is not None:
443 if four is not None:
444 opts['four'] = four
444 opts['four'] = four
445 return self._call('debugwireargs', one=one, two=two, **opts)
445 return self._call('debugwireargs', one=one, two=two, **opts)
446
446
447 def _call(self, cmd, **args):
447 def _call(self, cmd, **args):
448 """execute <cmd> on the server
448 """execute <cmd> on the server
449
449
450 The command is expected to return a simple string.
450 The command is expected to return a simple string.
451
451
452 returns the server reply as a string."""
452 returns the server reply as a string."""
453 raise NotImplementedError()
453 raise NotImplementedError()
454
454
455 def _callstream(self, cmd, **args):
455 def _callstream(self, cmd, **args):
456 """execute <cmd> on the server
456 """execute <cmd> on the server
457
457
458 The command is expected to return a stream. Note that if the
458 The command is expected to return a stream. Note that if the
459 command doesn't return a stream, _callstream behaves
459 command doesn't return a stream, _callstream behaves
460 differently for ssh and http peers.
460 differently for ssh and http peers.
461
461
462 returns the server reply as a file like object.
462 returns the server reply as a file like object.
463 """
463 """
464 raise NotImplementedError()
464 raise NotImplementedError()
465
465
466 def _callcompressable(self, cmd, **args):
466 def _callcompressable(self, cmd, **args):
467 """execute <cmd> on the server
467 """execute <cmd> on the server
468
468
469 The command is expected to return a stream.
469 The command is expected to return a stream.
470
470
471 The stream may have been compressed in some implementations. This
471 The stream may have been compressed in some implementations. This
472 function takes care of the decompression. This is the only difference
472 function takes care of the decompression. This is the only difference
473 with _callstream.
473 with _callstream.
474
474
475 returns the server reply as a file like object.
475 returns the server reply as a file like object.
476 """
476 """
477 raise NotImplementedError()
477 raise NotImplementedError()
478
478
479 def _callpush(self, cmd, fp, **args):
479 def _callpush(self, cmd, fp, **args):
480 """execute a <cmd> on server
480 """execute a <cmd> on server
481
481
482 The command is expected to be related to a push. Push has a special
482 The command is expected to be related to a push. Push has a special
483 return method.
483 return method.
484
484
485 returns the server reply as a (ret, output) tuple. ret is either
485 returns the server reply as a (ret, output) tuple. ret is either
486 empty (error) or a stringified int.
486 empty (error) or a stringified int.
487 """
487 """
488 raise NotImplementedError()
488 raise NotImplementedError()
489
489
490 def _calltwowaystream(self, cmd, fp, **args):
490 def _calltwowaystream(self, cmd, fp, **args):
491 """execute <cmd> on server
491 """execute <cmd> on server
492
492
493 The command will send a stream to the server and get a stream in reply.
493 The command will send a stream to the server and get a stream in reply.
494 """
494 """
495 raise NotImplementedError()
495 raise NotImplementedError()
496
496
497 def _abort(self, exception):
497 def _abort(self, exception):
498 """clearly abort the wire protocol connection and raise the exception
498 """clearly abort the wire protocol connection and raise the exception
499 """
499 """
500 raise NotImplementedError()
500 raise NotImplementedError()
501
501
502 # server side
502 # server side
503
503
504 # wire protocol command can either return a string or one of these classes.
504 # wire protocol command can either return a string or one of these classes.
505 class streamres(object):
505 class streamres(object):
506 """wireproto reply: binary stream
506 """wireproto reply: binary stream
507
507
508 The call was successful and the result is a stream.
508 The call was successful and the result is a stream.
509 Iterate on the `self.gen` attribute to retrieve chunks.
509 Iterate on the `self.gen` attribute to retrieve chunks.
510 """
510 """
511 def __init__(self, gen):
511 def __init__(self, gen):
512 self.gen = gen
512 self.gen = gen
513
513
514 class pushres(object):
514 class pushres(object):
515 """wireproto reply: success with simple integer return
515 """wireproto reply: success with simple integer return
516
516
517 The call was successful and returned an integer contained in `self.res`.
517 The call was successful and returned an integer contained in `self.res`.
518 """
518 """
519 def __init__(self, res):
519 def __init__(self, res):
520 self.res = res
520 self.res = res
521
521
522 class pusherr(object):
522 class pusherr(object):
523 """wireproto reply: failure
523 """wireproto reply: failure
524
524
525 The call failed. The `self.res` attribute contains the error message.
525 The call failed. The `self.res` attribute contains the error message.
526 """
526 """
527 def __init__(self, res):
527 def __init__(self, res):
528 self.res = res
528 self.res = res
529
529
530 class ooberror(object):
530 class ooberror(object):
531 """wireproto reply: failure of a batch of operation
531 """wireproto reply: failure of a batch of operation
532
532
533 Something failed during a batch call. The error message is stored in
533 Something failed during a batch call. The error message is stored in
534 `self.message`.
534 `self.message`.
535 """
535 """
536 def __init__(self, message):
536 def __init__(self, message):
537 self.message = message
537 self.message = message
538
538
539 def getdispatchrepo(repo, proto, command):
540 """Obtain the repo used for processing wire protocol commands.
541
542 The intent of this function is to serve as a monkeypatch point for
543 extensions that need commands to operate on different repo views under
544 specialized circumstances.
545 """
546 return repo.filtered('served')
547
539 def dispatch(repo, proto, command):
548 def dispatch(repo, proto, command):
540 repo = repo.filtered("served")
549 repo = getdispatchrepo(repo, proto, command)
541 func, spec = commands[command]
550 func, spec = commands[command]
542 args = proto.getargs(spec)
551 args = proto.getargs(spec)
543 return func(repo, proto, *args)
552 return func(repo, proto, *args)
544
553
545 def options(cmd, keys, others):
554 def options(cmd, keys, others):
546 opts = {}
555 opts = {}
547 for k in keys:
556 for k in keys:
548 if k in others:
557 if k in others:
549 opts[k] = others[k]
558 opts[k] = others[k]
550 del others[k]
559 del others[k]
551 if others:
560 if others:
552 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
561 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
553 % (cmd, ",".join(others)))
562 % (cmd, ",".join(others)))
554 return opts
563 return opts
555
564
556 def bundle1allowed(repo, action):
565 def bundle1allowed(repo, action):
557 """Whether a bundle1 operation is allowed from the server.
566 """Whether a bundle1 operation is allowed from the server.
558
567
559 Priority is:
568 Priority is:
560
569
561 1. server.bundle1gd.<action> (if generaldelta active)
570 1. server.bundle1gd.<action> (if generaldelta active)
562 2. server.bundle1.<action>
571 2. server.bundle1.<action>
563 3. server.bundle1gd (if generaldelta active)
572 3. server.bundle1gd (if generaldelta active)
564 4. server.bundle1
573 4. server.bundle1
565 """
574 """
566 ui = repo.ui
575 ui = repo.ui
567 gd = 'generaldelta' in repo.requirements
576 gd = 'generaldelta' in repo.requirements
568
577
569 if gd:
578 if gd:
570 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
579 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
571 if v is not None:
580 if v is not None:
572 return v
581 return v
573
582
574 v = ui.configbool('server', 'bundle1.%s' % action, None)
583 v = ui.configbool('server', 'bundle1.%s' % action, None)
575 if v is not None:
584 if v is not None:
576 return v
585 return v
577
586
578 if gd:
587 if gd:
579 v = ui.configbool('server', 'bundle1gd', None)
588 v = ui.configbool('server', 'bundle1gd', None)
580 if v is not None:
589 if v is not None:
581 return v
590 return v
582
591
583 return ui.configbool('server', 'bundle1', True)
592 return ui.configbool('server', 'bundle1', True)
584
593
585 # list of commands
594 # list of commands
586 commands = {}
595 commands = {}
587
596
588 def wireprotocommand(name, args=''):
597 def wireprotocommand(name, args=''):
589 """decorator for wire protocol command"""
598 """decorator for wire protocol command"""
590 def register(func):
599 def register(func):
591 commands[name] = (func, args)
600 commands[name] = (func, args)
592 return func
601 return func
593 return register
602 return register
594
603
595 @wireprotocommand('batch', 'cmds *')
604 @wireprotocommand('batch', 'cmds *')
596 def batch(repo, proto, cmds, others):
605 def batch(repo, proto, cmds, others):
597 repo = repo.filtered("served")
606 repo = repo.filtered("served")
598 res = []
607 res = []
599 for pair in cmds.split(';'):
608 for pair in cmds.split(';'):
600 op, args = pair.split(' ', 1)
609 op, args = pair.split(' ', 1)
601 vals = {}
610 vals = {}
602 for a in args.split(','):
611 for a in args.split(','):
603 if a:
612 if a:
604 n, v = a.split('=')
613 n, v = a.split('=')
605 vals[n] = unescapearg(v)
614 vals[n] = unescapearg(v)
606 func, spec = commands[op]
615 func, spec = commands[op]
607 if spec:
616 if spec:
608 keys = spec.split()
617 keys = spec.split()
609 data = {}
618 data = {}
610 for k in keys:
619 for k in keys:
611 if k == '*':
620 if k == '*':
612 star = {}
621 star = {}
613 for key in vals.keys():
622 for key in vals.keys():
614 if key not in keys:
623 if key not in keys:
615 star[key] = vals[key]
624 star[key] = vals[key]
616 data['*'] = star
625 data['*'] = star
617 else:
626 else:
618 data[k] = vals[k]
627 data[k] = vals[k]
619 result = func(repo, proto, *[data[k] for k in keys])
628 result = func(repo, proto, *[data[k] for k in keys])
620 else:
629 else:
621 result = func(repo, proto)
630 result = func(repo, proto)
622 if isinstance(result, ooberror):
631 if isinstance(result, ooberror):
623 return result
632 return result
624 res.append(escapearg(result))
633 res.append(escapearg(result))
625 return ';'.join(res)
634 return ';'.join(res)
626
635
627 @wireprotocommand('between', 'pairs')
636 @wireprotocommand('between', 'pairs')
628 def between(repo, proto, pairs):
637 def between(repo, proto, pairs):
629 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
638 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
630 r = []
639 r = []
631 for b in repo.between(pairs):
640 for b in repo.between(pairs):
632 r.append(encodelist(b) + "\n")
641 r.append(encodelist(b) + "\n")
633 return "".join(r)
642 return "".join(r)
634
643
635 @wireprotocommand('branchmap')
644 @wireprotocommand('branchmap')
636 def branchmap(repo, proto):
645 def branchmap(repo, proto):
637 branchmap = repo.branchmap()
646 branchmap = repo.branchmap()
638 heads = []
647 heads = []
639 for branch, nodes in branchmap.iteritems():
648 for branch, nodes in branchmap.iteritems():
640 branchname = urlreq.quote(encoding.fromlocal(branch))
649 branchname = urlreq.quote(encoding.fromlocal(branch))
641 branchnodes = encodelist(nodes)
650 branchnodes = encodelist(nodes)
642 heads.append('%s %s' % (branchname, branchnodes))
651 heads.append('%s %s' % (branchname, branchnodes))
643 return '\n'.join(heads)
652 return '\n'.join(heads)
644
653
645 @wireprotocommand('branches', 'nodes')
654 @wireprotocommand('branches', 'nodes')
646 def branches(repo, proto, nodes):
655 def branches(repo, proto, nodes):
647 nodes = decodelist(nodes)
656 nodes = decodelist(nodes)
648 r = []
657 r = []
649 for b in repo.branches(nodes):
658 for b in repo.branches(nodes):
650 r.append(encodelist(b) + "\n")
659 r.append(encodelist(b) + "\n")
651 return "".join(r)
660 return "".join(r)
652
661
653 @wireprotocommand('clonebundles', '')
662 @wireprotocommand('clonebundles', '')
654 def clonebundles(repo, proto):
663 def clonebundles(repo, proto):
655 """Server command for returning info for available bundles to seed clones.
664 """Server command for returning info for available bundles to seed clones.
656
665
657 Clients will parse this response and determine what bundle to fetch.
666 Clients will parse this response and determine what bundle to fetch.
658
667
659 Extensions may wrap this command to filter or dynamically emit data
668 Extensions may wrap this command to filter or dynamically emit data
660 depending on the request. e.g. you could advertise URLs for the closest
669 depending on the request. e.g. you could advertise URLs for the closest
661 data center given the client's IP address.
670 data center given the client's IP address.
662 """
671 """
663 return repo.opener.tryread('clonebundles.manifest')
672 return repo.opener.tryread('clonebundles.manifest')
664
673
665 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
674 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
666 'known', 'getbundle', 'unbundlehash', 'batch']
675 'known', 'getbundle', 'unbundlehash', 'batch']
667
676
668 def _capabilities(repo, proto):
677 def _capabilities(repo, proto):
669 """return a list of capabilities for a repo
678 """return a list of capabilities for a repo
670
679
671 This function exists to allow extensions to easily wrap capabilities
680 This function exists to allow extensions to easily wrap capabilities
672 computation
681 computation
673
682
674 - returns a lists: easy to alter
683 - returns a lists: easy to alter
675 - change done here will be propagated to both `capabilities` and `hello`
684 - change done here will be propagated to both `capabilities` and `hello`
676 command without any other action needed.
685 command without any other action needed.
677 """
686 """
678 # copy to prevent modification of the global list
687 # copy to prevent modification of the global list
679 caps = list(wireprotocaps)
688 caps = list(wireprotocaps)
680 if streamclone.allowservergeneration(repo.ui):
689 if streamclone.allowservergeneration(repo.ui):
681 if repo.ui.configbool('server', 'preferuncompressed', False):
690 if repo.ui.configbool('server', 'preferuncompressed', False):
682 caps.append('stream-preferred')
691 caps.append('stream-preferred')
683 requiredformats = repo.requirements & repo.supportedformats
692 requiredformats = repo.requirements & repo.supportedformats
684 # if our local revlogs are just revlogv1, add 'stream' cap
693 # if our local revlogs are just revlogv1, add 'stream' cap
685 if not requiredformats - set(('revlogv1',)):
694 if not requiredformats - set(('revlogv1',)):
686 caps.append('stream')
695 caps.append('stream')
687 # otherwise, add 'streamreqs' detailing our local revlog format
696 # otherwise, add 'streamreqs' detailing our local revlog format
688 else:
697 else:
689 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
698 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
690 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
699 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
691 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
700 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
692 caps.append('bundle2=' + urlreq.quote(capsblob))
701 caps.append('bundle2=' + urlreq.quote(capsblob))
693 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
702 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
694 caps.append(
703 caps.append(
695 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
704 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
696 if repo.ui.configbool('experimental', 'httppostargs', False):
705 if repo.ui.configbool('experimental', 'httppostargs', False):
697 caps.append('httppostargs')
706 caps.append('httppostargs')
698 return caps
707 return caps
699
708
700 # If you are writing an extension and consider wrapping this function. Wrap
709 # If you are writing an extension and consider wrapping this function. Wrap
701 # `_capabilities` instead.
710 # `_capabilities` instead.
702 @wireprotocommand('capabilities')
711 @wireprotocommand('capabilities')
703 def capabilities(repo, proto):
712 def capabilities(repo, proto):
704 return ' '.join(_capabilities(repo, proto))
713 return ' '.join(_capabilities(repo, proto))
705
714
706 @wireprotocommand('changegroup', 'roots')
715 @wireprotocommand('changegroup', 'roots')
707 def changegroup(repo, proto, roots):
716 def changegroup(repo, proto, roots):
708 nodes = decodelist(roots)
717 nodes = decodelist(roots)
709 cg = changegroupmod.changegroup(repo, nodes, 'serve')
718 cg = changegroupmod.changegroup(repo, nodes, 'serve')
710 return streamres(proto.groupchunks(cg))
719 return streamres(proto.groupchunks(cg))
711
720
712 @wireprotocommand('changegroupsubset', 'bases heads')
721 @wireprotocommand('changegroupsubset', 'bases heads')
713 def changegroupsubset(repo, proto, bases, heads):
722 def changegroupsubset(repo, proto, bases, heads):
714 bases = decodelist(bases)
723 bases = decodelist(bases)
715 heads = decodelist(heads)
724 heads = decodelist(heads)
716 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
725 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
717 return streamres(proto.groupchunks(cg))
726 return streamres(proto.groupchunks(cg))
718
727
719 @wireprotocommand('debugwireargs', 'one two *')
728 @wireprotocommand('debugwireargs', 'one two *')
720 def debugwireargs(repo, proto, one, two, others):
729 def debugwireargs(repo, proto, one, two, others):
721 # only accept optional args from the known set
730 # only accept optional args from the known set
722 opts = options('debugwireargs', ['three', 'four'], others)
731 opts = options('debugwireargs', ['three', 'four'], others)
723 return repo.debugwireargs(one, two, **opts)
732 return repo.debugwireargs(one, two, **opts)
724
733
725 # List of options accepted by getbundle.
734 # List of options accepted by getbundle.
726 #
735 #
727 # Meant to be extended by extensions. It is the extension's responsibility to
736 # Meant to be extended by extensions. It is the extension's responsibility to
728 # ensure such options are properly processed in exchange.getbundle.
737 # ensure such options are properly processed in exchange.getbundle.
729 gboptslist = ['heads', 'common', 'bundlecaps']
738 gboptslist = ['heads', 'common', 'bundlecaps']
730
739
731 @wireprotocommand('getbundle', '*')
740 @wireprotocommand('getbundle', '*')
732 def getbundle(repo, proto, others):
741 def getbundle(repo, proto, others):
733 opts = options('getbundle', gboptsmap.keys(), others)
742 opts = options('getbundle', gboptsmap.keys(), others)
734 for k, v in opts.iteritems():
743 for k, v in opts.iteritems():
735 keytype = gboptsmap[k]
744 keytype = gboptsmap[k]
736 if keytype == 'nodes':
745 if keytype == 'nodes':
737 opts[k] = decodelist(v)
746 opts[k] = decodelist(v)
738 elif keytype == 'csv':
747 elif keytype == 'csv':
739 opts[k] = list(v.split(','))
748 opts[k] = list(v.split(','))
740 elif keytype == 'scsv':
749 elif keytype == 'scsv':
741 opts[k] = set(v.split(','))
750 opts[k] = set(v.split(','))
742 elif keytype == 'boolean':
751 elif keytype == 'boolean':
743 # Client should serialize False as '0', which is a non-empty string
752 # Client should serialize False as '0', which is a non-empty string
744 # so it evaluates as a True bool.
753 # so it evaluates as a True bool.
745 if v == '0':
754 if v == '0':
746 opts[k] = False
755 opts[k] = False
747 else:
756 else:
748 opts[k] = bool(v)
757 opts[k] = bool(v)
749 elif keytype != 'plain':
758 elif keytype != 'plain':
750 raise KeyError('unknown getbundle option type %s'
759 raise KeyError('unknown getbundle option type %s'
751 % keytype)
760 % keytype)
752
761
753 if not bundle1allowed(repo, 'pull'):
762 if not bundle1allowed(repo, 'pull'):
754 if not exchange.bundle2requested(opts.get('bundlecaps')):
763 if not exchange.bundle2requested(opts.get('bundlecaps')):
755 return ooberror(bundle2required)
764 return ooberror(bundle2required)
756
765
757 cg = exchange.getbundle(repo, 'serve', **opts)
766 cg = exchange.getbundle(repo, 'serve', **opts)
758 return streamres(proto.groupchunks(cg))
767 return streamres(proto.groupchunks(cg))
759
768
760 @wireprotocommand('heads')
769 @wireprotocommand('heads')
761 def heads(repo, proto):
770 def heads(repo, proto):
762 h = repo.heads()
771 h = repo.heads()
763 return encodelist(h) + "\n"
772 return encodelist(h) + "\n"
764
773
765 @wireprotocommand('hello')
774 @wireprotocommand('hello')
766 def hello(repo, proto):
775 def hello(repo, proto):
767 '''the hello command returns a set of lines describing various
776 '''the hello command returns a set of lines describing various
768 interesting things about the server, in an RFC822-like format.
777 interesting things about the server, in an RFC822-like format.
769 Currently the only one defined is "capabilities", which
778 Currently the only one defined is "capabilities", which
770 consists of a line in the form:
779 consists of a line in the form:
771
780
772 capabilities: space separated list of tokens
781 capabilities: space separated list of tokens
773 '''
782 '''
774 return "capabilities: %s\n" % (capabilities(repo, proto))
783 return "capabilities: %s\n" % (capabilities(repo, proto))
775
784
776 @wireprotocommand('listkeys', 'namespace')
785 @wireprotocommand('listkeys', 'namespace')
777 def listkeys(repo, proto, namespace):
786 def listkeys(repo, proto, namespace):
778 d = repo.listkeys(encoding.tolocal(namespace)).items()
787 d = repo.listkeys(encoding.tolocal(namespace)).items()
779 return pushkeymod.encodekeys(d)
788 return pushkeymod.encodekeys(d)
780
789
781 @wireprotocommand('lookup', 'key')
790 @wireprotocommand('lookup', 'key')
782 def lookup(repo, proto, key):
791 def lookup(repo, proto, key):
783 try:
792 try:
784 k = encoding.tolocal(key)
793 k = encoding.tolocal(key)
785 c = repo[k]
794 c = repo[k]
786 r = c.hex()
795 r = c.hex()
787 success = 1
796 success = 1
788 except Exception as inst:
797 except Exception as inst:
789 r = str(inst)
798 r = str(inst)
790 success = 0
799 success = 0
791 return "%s %s\n" % (success, r)
800 return "%s %s\n" % (success, r)
792
801
793 @wireprotocommand('known', 'nodes *')
802 @wireprotocommand('known', 'nodes *')
794 def known(repo, proto, nodes, others):
803 def known(repo, proto, nodes, others):
795 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
804 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
796
805
797 @wireprotocommand('pushkey', 'namespace key old new')
806 @wireprotocommand('pushkey', 'namespace key old new')
798 def pushkey(repo, proto, namespace, key, old, new):
807 def pushkey(repo, proto, namespace, key, old, new):
799 # compatibility with pre-1.8 clients which were accidentally
808 # compatibility with pre-1.8 clients which were accidentally
800 # sending raw binary nodes rather than utf-8-encoded hex
809 # sending raw binary nodes rather than utf-8-encoded hex
801 if len(new) == 20 and new.encode('string-escape') != new:
810 if len(new) == 20 and new.encode('string-escape') != new:
802 # looks like it could be a binary node
811 # looks like it could be a binary node
803 try:
812 try:
804 new.decode('utf-8')
813 new.decode('utf-8')
805 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
814 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
806 except UnicodeDecodeError:
815 except UnicodeDecodeError:
807 pass # binary, leave unmodified
816 pass # binary, leave unmodified
808 else:
817 else:
809 new = encoding.tolocal(new) # normal path
818 new = encoding.tolocal(new) # normal path
810
819
811 if util.safehasattr(proto, 'restore'):
820 if util.safehasattr(proto, 'restore'):
812
821
813 proto.redirect()
822 proto.redirect()
814
823
815 try:
824 try:
816 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
825 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
817 encoding.tolocal(old), new) or False
826 encoding.tolocal(old), new) or False
818 except error.Abort:
827 except error.Abort:
819 r = False
828 r = False
820
829
821 output = proto.restore()
830 output = proto.restore()
822
831
823 return '%s\n%s' % (int(r), output)
832 return '%s\n%s' % (int(r), output)
824
833
825 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
834 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
826 encoding.tolocal(old), new)
835 encoding.tolocal(old), new)
827 return '%s\n' % int(r)
836 return '%s\n' % int(r)
828
837
829 @wireprotocommand('stream_out')
838 @wireprotocommand('stream_out')
830 def stream(repo, proto):
839 def stream(repo, proto):
831 '''If the server supports streaming clone, it advertises the "stream"
840 '''If the server supports streaming clone, it advertises the "stream"
832 capability with a value representing the version and flags of the repo
841 capability with a value representing the version and flags of the repo
833 it is serving. Client checks to see if it understands the format.
842 it is serving. Client checks to see if it understands the format.
834 '''
843 '''
835 if not streamclone.allowservergeneration(repo.ui):
844 if not streamclone.allowservergeneration(repo.ui):
836 return '1\n'
845 return '1\n'
837
846
838 def getstream(it):
847 def getstream(it):
839 yield '0\n'
848 yield '0\n'
840 for chunk in it:
849 for chunk in it:
841 yield chunk
850 yield chunk
842
851
843 try:
852 try:
844 # LockError may be raised before the first result is yielded. Don't
853 # LockError may be raised before the first result is yielded. Don't
845 # emit output until we're sure we got the lock successfully.
854 # emit output until we're sure we got the lock successfully.
846 it = streamclone.generatev1wireproto(repo)
855 it = streamclone.generatev1wireproto(repo)
847 return streamres(getstream(it))
856 return streamres(getstream(it))
848 except error.LockError:
857 except error.LockError:
849 return '2\n'
858 return '2\n'
850
859
851 @wireprotocommand('unbundle', 'heads')
860 @wireprotocommand('unbundle', 'heads')
852 def unbundle(repo, proto, heads):
861 def unbundle(repo, proto, heads):
853 their_heads = decodelist(heads)
862 their_heads = decodelist(heads)
854
863
855 try:
864 try:
856 proto.redirect()
865 proto.redirect()
857
866
858 exchange.check_heads(repo, their_heads, 'preparing changes')
867 exchange.check_heads(repo, their_heads, 'preparing changes')
859
868
860 # write bundle data to temporary file because it can be big
869 # write bundle data to temporary file because it can be big
861 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
870 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
862 fp = os.fdopen(fd, 'wb+')
871 fp = os.fdopen(fd, 'wb+')
863 r = 0
872 r = 0
864 try:
873 try:
865 proto.getfile(fp)
874 proto.getfile(fp)
866 fp.seek(0)
875 fp.seek(0)
867 gen = exchange.readbundle(repo.ui, fp, None)
876 gen = exchange.readbundle(repo.ui, fp, None)
868 if (isinstance(gen, changegroupmod.cg1unpacker)
877 if (isinstance(gen, changegroupmod.cg1unpacker)
869 and not bundle1allowed(repo, 'push')):
878 and not bundle1allowed(repo, 'push')):
870 return ooberror(bundle2required)
879 return ooberror(bundle2required)
871
880
872 r = exchange.unbundle(repo, gen, their_heads, 'serve',
881 r = exchange.unbundle(repo, gen, their_heads, 'serve',
873 proto._client())
882 proto._client())
874 if util.safehasattr(r, 'addpart'):
883 if util.safehasattr(r, 'addpart'):
875 # The return looks streamable, we are in the bundle2 case and
884 # The return looks streamable, we are in the bundle2 case and
876 # should return a stream.
885 # should return a stream.
877 return streamres(r.getchunks())
886 return streamres(r.getchunks())
878 return pushres(r)
887 return pushres(r)
879
888
880 finally:
889 finally:
881 fp.close()
890 fp.close()
882 os.unlink(tempname)
891 os.unlink(tempname)
883
892
884 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
893 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
885 # handle non-bundle2 case first
894 # handle non-bundle2 case first
886 if not getattr(exc, 'duringunbundle2', False):
895 if not getattr(exc, 'duringunbundle2', False):
887 try:
896 try:
888 raise
897 raise
889 except error.Abort:
898 except error.Abort:
890 # The old code we moved used sys.stderr directly.
899 # The old code we moved used sys.stderr directly.
891 # We did not change it to minimise code change.
900 # We did not change it to minimise code change.
892 # This need to be moved to something proper.
901 # This need to be moved to something proper.
893 # Feel free to do it.
902 # Feel free to do it.
894 sys.stderr.write("abort: %s\n" % exc)
903 sys.stderr.write("abort: %s\n" % exc)
895 return pushres(0)
904 return pushres(0)
896 except error.PushRaced:
905 except error.PushRaced:
897 return pusherr(str(exc))
906 return pusherr(str(exc))
898
907
899 bundler = bundle2.bundle20(repo.ui)
908 bundler = bundle2.bundle20(repo.ui)
900 for out in getattr(exc, '_bundle2salvagedoutput', ()):
909 for out in getattr(exc, '_bundle2salvagedoutput', ()):
901 bundler.addpart(out)
910 bundler.addpart(out)
902 try:
911 try:
903 try:
912 try:
904 raise
913 raise
905 except error.PushkeyFailed as exc:
914 except error.PushkeyFailed as exc:
906 # check client caps
915 # check client caps
907 remotecaps = getattr(exc, '_replycaps', None)
916 remotecaps = getattr(exc, '_replycaps', None)
908 if (remotecaps is not None
917 if (remotecaps is not None
909 and 'pushkey' not in remotecaps.get('error', ())):
918 and 'pushkey' not in remotecaps.get('error', ())):
910 # no support remote side, fallback to Abort handler.
919 # no support remote side, fallback to Abort handler.
911 raise
920 raise
912 part = bundler.newpart('error:pushkey')
921 part = bundler.newpart('error:pushkey')
913 part.addparam('in-reply-to', exc.partid)
922 part.addparam('in-reply-to', exc.partid)
914 if exc.namespace is not None:
923 if exc.namespace is not None:
915 part.addparam('namespace', exc.namespace, mandatory=False)
924 part.addparam('namespace', exc.namespace, mandatory=False)
916 if exc.key is not None:
925 if exc.key is not None:
917 part.addparam('key', exc.key, mandatory=False)
926 part.addparam('key', exc.key, mandatory=False)
918 if exc.new is not None:
927 if exc.new is not None:
919 part.addparam('new', exc.new, mandatory=False)
928 part.addparam('new', exc.new, mandatory=False)
920 if exc.old is not None:
929 if exc.old is not None:
921 part.addparam('old', exc.old, mandatory=False)
930 part.addparam('old', exc.old, mandatory=False)
922 if exc.ret is not None:
931 if exc.ret is not None:
923 part.addparam('ret', exc.ret, mandatory=False)
932 part.addparam('ret', exc.ret, mandatory=False)
924 except error.BundleValueError as exc:
933 except error.BundleValueError as exc:
925 errpart = bundler.newpart('error:unsupportedcontent')
934 errpart = bundler.newpart('error:unsupportedcontent')
926 if exc.parttype is not None:
935 if exc.parttype is not None:
927 errpart.addparam('parttype', exc.parttype)
936 errpart.addparam('parttype', exc.parttype)
928 if exc.params:
937 if exc.params:
929 errpart.addparam('params', '\0'.join(exc.params))
938 errpart.addparam('params', '\0'.join(exc.params))
930 except error.Abort as exc:
939 except error.Abort as exc:
931 manargs = [('message', str(exc))]
940 manargs = [('message', str(exc))]
932 advargs = []
941 advargs = []
933 if exc.hint is not None:
942 if exc.hint is not None:
934 advargs.append(('hint', exc.hint))
943 advargs.append(('hint', exc.hint))
935 bundler.addpart(bundle2.bundlepart('error:abort',
944 bundler.addpart(bundle2.bundlepart('error:abort',
936 manargs, advargs))
945 manargs, advargs))
937 except error.PushRaced as exc:
946 except error.PushRaced as exc:
938 bundler.newpart('error:pushraced', [('message', str(exc))])
947 bundler.newpart('error:pushraced', [('message', str(exc))])
939 return streamres(bundler.getchunks())
948 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now