##// END OF EJS Templates
wirepeer: rename confusing `source` parameter...
Augie Fackler -
r29706:7f6130c7 default
parent child Browse files
Show More
@@ -1,948 +1,952
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import hashlib
10 import hashlib
11 import itertools
11 import itertools
12 import os
12 import os
13 import sys
13 import sys
14 import tempfile
14 import tempfile
15
15
16 from .i18n import _
16 from .i18n import _
17 from .node import (
17 from .node import (
18 bin,
18 bin,
19 hex,
19 hex,
20 )
20 )
21
21
22 from . import (
22 from . import (
23 bundle2,
23 bundle2,
24 changegroup as changegroupmod,
24 changegroup as changegroupmod,
25 encoding,
25 encoding,
26 error,
26 error,
27 exchange,
27 exchange,
28 peer,
28 peer,
29 pushkey as pushkeymod,
29 pushkey as pushkeymod,
30 streamclone,
30 streamclone,
31 util,
31 util,
32 )
32 )
33
33
34 urlerr = util.urlerr
34 urlerr = util.urlerr
35 urlreq = util.urlreq
35 urlreq = util.urlreq
36
36
37 bundle2required = _(
37 bundle2required = _(
38 'incompatible Mercurial client; bundle2 required\n'
38 'incompatible Mercurial client; bundle2 required\n'
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
39 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
40
40
41 class abstractserverproto(object):
41 class abstractserverproto(object):
42 """abstract class that summarizes the protocol API
42 """abstract class that summarizes the protocol API
43
43
44 Used as reference and documentation.
44 Used as reference and documentation.
45 """
45 """
46
46
47 def getargs(self, args):
47 def getargs(self, args):
48 """return the value for arguments in <args>
48 """return the value for arguments in <args>
49
49
50 returns a list of values (same order as <args>)"""
50 returns a list of values (same order as <args>)"""
51 raise NotImplementedError()
51 raise NotImplementedError()
52
52
53 def getfile(self, fp):
53 def getfile(self, fp):
54 """write the whole content of a file into a file like object
54 """write the whole content of a file into a file like object
55
55
56 The file is in the form::
56 The file is in the form::
57
57
58 (<chunk-size>\n<chunk>)+0\n
58 (<chunk-size>\n<chunk>)+0\n
59
59
60 chunk size is the ascii version of the int.
60 chunk size is the ascii version of the int.
61 """
61 """
62 raise NotImplementedError()
62 raise NotImplementedError()
63
63
64 def redirect(self):
64 def redirect(self):
65 """may setup interception for stdout and stderr
65 """may setup interception for stdout and stderr
66
66
67 See also the `restore` method."""
67 See also the `restore` method."""
68 raise NotImplementedError()
68 raise NotImplementedError()
69
69
70 # If the `redirect` function does install interception, the `restore`
70 # If the `redirect` function does install interception, the `restore`
71 # function MUST be defined. If interception is not used, this function
71 # function MUST be defined. If interception is not used, this function
72 # MUST NOT be defined.
72 # MUST NOT be defined.
73 #
73 #
74 # left commented here on purpose
74 # left commented here on purpose
75 #
75 #
76 #def restore(self):
76 #def restore(self):
77 # """reinstall previous stdout and stderr and return intercepted stdout
77 # """reinstall previous stdout and stderr and return intercepted stdout
78 # """
78 # """
79 # raise NotImplementedError()
79 # raise NotImplementedError()
80
80
81 def groupchunks(self, cg):
81 def groupchunks(self, cg):
82 """return 4096 chunks from a changegroup object
82 """return 4096 chunks from a changegroup object
83
83
84 Some protocols may have compressed the contents."""
84 Some protocols may have compressed the contents."""
85 raise NotImplementedError()
85 raise NotImplementedError()
86
86
87 class remotebatch(peer.batcher):
87 class remotebatch(peer.batcher):
88 '''batches the queued calls; uses as few roundtrips as possible'''
88 '''batches the queued calls; uses as few roundtrips as possible'''
89 def __init__(self, remote):
89 def __init__(self, remote):
90 '''remote must support _submitbatch(encbatch) and
90 '''remote must support _submitbatch(encbatch) and
91 _submitone(op, encargs)'''
91 _submitone(op, encargs)'''
92 peer.batcher.__init__(self)
92 peer.batcher.__init__(self)
93 self.remote = remote
93 self.remote = remote
94 def submit(self):
94 def submit(self):
95 req, rsp = [], []
95 req, rsp = [], []
96 for name, args, opts, resref in self.calls:
96 for name, args, opts, resref in self.calls:
97 mtd = getattr(self.remote, name)
97 mtd = getattr(self.remote, name)
98 batchablefn = getattr(mtd, 'batchable', None)
98 batchablefn = getattr(mtd, 'batchable', None)
99 if batchablefn is not None:
99 if batchablefn is not None:
100 batchable = batchablefn(mtd.im_self, *args, **opts)
100 batchable = batchablefn(mtd.im_self, *args, **opts)
101 encargsorres, encresref = next(batchable)
101 encargsorres, encresref = next(batchable)
102 if encresref:
102 if encresref:
103 req.append((name, encargsorres,))
103 req.append((name, encargsorres,))
104 rsp.append((batchable, encresref, resref,))
104 rsp.append((batchable, encresref, resref,))
105 else:
105 else:
106 resref.set(encargsorres)
106 resref.set(encargsorres)
107 else:
107 else:
108 if req:
108 if req:
109 self._submitreq(req, rsp)
109 self._submitreq(req, rsp)
110 req, rsp = [], []
110 req, rsp = [], []
111 resref.set(mtd(*args, **opts))
111 resref.set(mtd(*args, **opts))
112 if req:
112 if req:
113 self._submitreq(req, rsp)
113 self._submitreq(req, rsp)
114 def _submitreq(self, req, rsp):
114 def _submitreq(self, req, rsp):
115 encresults = self.remote._submitbatch(req)
115 encresults = self.remote._submitbatch(req)
116 for encres, r in zip(encresults, rsp):
116 for encres, r in zip(encresults, rsp):
117 batchable, encresref, resref = r
117 batchable, encresref, resref = r
118 encresref.set(encres)
118 encresref.set(encres)
119 resref.set(next(batchable))
119 resref.set(next(batchable))
120
120
121 class remoteiterbatcher(peer.iterbatcher):
121 class remoteiterbatcher(peer.iterbatcher):
122 def __init__(self, remote):
122 def __init__(self, remote):
123 super(remoteiterbatcher, self).__init__()
123 super(remoteiterbatcher, self).__init__()
124 self._remote = remote
124 self._remote = remote
125
125
126 def __getattr__(self, name):
126 def __getattr__(self, name):
127 if not getattr(self._remote, name, False):
127 if not getattr(self._remote, name, False):
128 raise AttributeError(
128 raise AttributeError(
129 'Attempted to iterbatch non-batchable call to %r' % name)
129 'Attempted to iterbatch non-batchable call to %r' % name)
130 return super(remoteiterbatcher, self).__getattr__(name)
130 return super(remoteiterbatcher, self).__getattr__(name)
131
131
132 def submit(self):
132 def submit(self):
133 """Break the batch request into many patch calls and pipeline them.
133 """Break the batch request into many patch calls and pipeline them.
134
134
135 This is mostly valuable over http where request sizes can be
135 This is mostly valuable over http where request sizes can be
136 limited, but can be used in other places as well.
136 limited, but can be used in other places as well.
137 """
137 """
138 req, rsp = [], []
138 req, rsp = [], []
139 for name, args, opts, resref in self.calls:
139 for name, args, opts, resref in self.calls:
140 mtd = getattr(self._remote, name)
140 mtd = getattr(self._remote, name)
141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
141 batchable = mtd.batchable(mtd.im_self, *args, **opts)
142 encargsorres, encresref = next(batchable)
142 encargsorres, encresref = next(batchable)
143 assert encresref
143 assert encresref
144 req.append((name, encargsorres))
144 req.append((name, encargsorres))
145 rsp.append((batchable, encresref))
145 rsp.append((batchable, encresref))
146 if req:
146 if req:
147 self._resultiter = self._remote._submitbatch(req)
147 self._resultiter = self._remote._submitbatch(req)
148 self._rsp = rsp
148 self._rsp = rsp
149
149
150 def results(self):
150 def results(self):
151 for (batchable, encresref), encres in itertools.izip(
151 for (batchable, encresref), encres in itertools.izip(
152 self._rsp, self._resultiter):
152 self._rsp, self._resultiter):
153 encresref.set(encres)
153 encresref.set(encres)
154 yield next(batchable)
154 yield next(batchable)
155
155
156 # Forward a couple of names from peer to make wireproto interactions
156 # Forward a couple of names from peer to make wireproto interactions
157 # slightly more sensible.
157 # slightly more sensible.
158 batchable = peer.batchable
158 batchable = peer.batchable
159 future = peer.future
159 future = peer.future
160
160
161 # list of nodes encoding / decoding
161 # list of nodes encoding / decoding
162
162
163 def decodelist(l, sep=' '):
163 def decodelist(l, sep=' '):
164 if l:
164 if l:
165 return map(bin, l.split(sep))
165 return map(bin, l.split(sep))
166 return []
166 return []
167
167
168 def encodelist(l, sep=' '):
168 def encodelist(l, sep=' '):
169 try:
169 try:
170 return sep.join(map(hex, l))
170 return sep.join(map(hex, l))
171 except TypeError:
171 except TypeError:
172 raise
172 raise
173
173
174 # batched call argument encoding
174 # batched call argument encoding
175
175
176 def escapearg(plain):
176 def escapearg(plain):
177 return (plain
177 return (plain
178 .replace(':', ':c')
178 .replace(':', ':c')
179 .replace(',', ':o')
179 .replace(',', ':o')
180 .replace(';', ':s')
180 .replace(';', ':s')
181 .replace('=', ':e'))
181 .replace('=', ':e'))
182
182
183 def unescapearg(escaped):
183 def unescapearg(escaped):
184 return (escaped
184 return (escaped
185 .replace(':e', '=')
185 .replace(':e', '=')
186 .replace(':s', ';')
186 .replace(':s', ';')
187 .replace(':o', ',')
187 .replace(':o', ',')
188 .replace(':c', ':'))
188 .replace(':c', ':'))
189
189
190 # mapping of options accepted by getbundle and their types
190 # mapping of options accepted by getbundle and their types
191 #
191 #
192 # Meant to be extended by extensions. It is extensions responsibility to ensure
192 # Meant to be extended by extensions. It is extensions responsibility to ensure
193 # such options are properly processed in exchange.getbundle.
193 # such options are properly processed in exchange.getbundle.
194 #
194 #
195 # supported types are:
195 # supported types are:
196 #
196 #
197 # :nodes: list of binary nodes
197 # :nodes: list of binary nodes
198 # :csv: list of comma-separated values
198 # :csv: list of comma-separated values
199 # :scsv: list of comma-separated values return as set
199 # :scsv: list of comma-separated values return as set
200 # :plain: string with no transformation needed.
200 # :plain: string with no transformation needed.
201 gboptsmap = {'heads': 'nodes',
201 gboptsmap = {'heads': 'nodes',
202 'common': 'nodes',
202 'common': 'nodes',
203 'obsmarkers': 'boolean',
203 'obsmarkers': 'boolean',
204 'bundlecaps': 'scsv',
204 'bundlecaps': 'scsv',
205 'listkeys': 'csv',
205 'listkeys': 'csv',
206 'cg': 'boolean',
206 'cg': 'boolean',
207 'cbattempted': 'boolean'}
207 'cbattempted': 'boolean'}
208
208
209 # client side
209 # client side
210
210
211 class wirepeer(peer.peerrepository):
211 class wirepeer(peer.peerrepository):
212 """Client-side interface for communicating with a peer repository.
212 """Client-side interface for communicating with a peer repository.
213
213
214 Methods commonly call wire protocol commands of the same name.
214 Methods commonly call wire protocol commands of the same name.
215
215
216 See also httppeer.py and sshpeer.py for protocol-specific
216 See also httppeer.py and sshpeer.py for protocol-specific
217 implementations of this interface.
217 implementations of this interface.
218 """
218 """
219 def batch(self):
219 def batch(self):
220 if self.capable('batch'):
220 if self.capable('batch'):
221 return remotebatch(self)
221 return remotebatch(self)
222 else:
222 else:
223 return peer.localbatch(self)
223 return peer.localbatch(self)
224 def _submitbatch(self, req):
224 def _submitbatch(self, req):
225 """run batch request <req> on the server
225 """run batch request <req> on the server
226
226
227 Returns an iterator of the raw responses from the server.
227 Returns an iterator of the raw responses from the server.
228 """
228 """
229 cmds = []
229 cmds = []
230 for op, argsdict in req:
230 for op, argsdict in req:
231 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
231 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
232 for k, v in argsdict.iteritems())
232 for k, v in argsdict.iteritems())
233 cmds.append('%s %s' % (op, args))
233 cmds.append('%s %s' % (op, args))
234 rsp = self._callstream("batch", cmds=';'.join(cmds))
234 rsp = self._callstream("batch", cmds=';'.join(cmds))
235 chunk = rsp.read(1024)
235 chunk = rsp.read(1024)
236 work = [chunk]
236 work = [chunk]
237 while chunk:
237 while chunk:
238 while ';' not in chunk and chunk:
238 while ';' not in chunk and chunk:
239 chunk = rsp.read(1024)
239 chunk = rsp.read(1024)
240 work.append(chunk)
240 work.append(chunk)
241 merged = ''.join(work)
241 merged = ''.join(work)
242 while ';' in merged:
242 while ';' in merged:
243 one, merged = merged.split(';', 1)
243 one, merged = merged.split(';', 1)
244 yield unescapearg(one)
244 yield unescapearg(one)
245 chunk = rsp.read(1024)
245 chunk = rsp.read(1024)
246 work = [merged, chunk]
246 work = [merged, chunk]
247 yield unescapearg(''.join(work))
247 yield unescapearg(''.join(work))
248
248
249 def _submitone(self, op, args):
249 def _submitone(self, op, args):
250 return self._call(op, **args)
250 return self._call(op, **args)
251
251
252 def iterbatch(self):
252 def iterbatch(self):
253 return remoteiterbatcher(self)
253 return remoteiterbatcher(self)
254
254
255 @batchable
255 @batchable
256 def lookup(self, key):
256 def lookup(self, key):
257 self.requirecap('lookup', _('look up remote revision'))
257 self.requirecap('lookup', _('look up remote revision'))
258 f = future()
258 f = future()
259 yield {'key': encoding.fromlocal(key)}, f
259 yield {'key': encoding.fromlocal(key)}, f
260 d = f.value
260 d = f.value
261 success, data = d[:-1].split(" ", 1)
261 success, data = d[:-1].split(" ", 1)
262 if int(success):
262 if int(success):
263 yield bin(data)
263 yield bin(data)
264 self._abort(error.RepoError(data))
264 self._abort(error.RepoError(data))
265
265
266 @batchable
266 @batchable
267 def heads(self):
267 def heads(self):
268 f = future()
268 f = future()
269 yield {}, f
269 yield {}, f
270 d = f.value
270 d = f.value
271 try:
271 try:
272 yield decodelist(d[:-1])
272 yield decodelist(d[:-1])
273 except ValueError:
273 except ValueError:
274 self._abort(error.ResponseError(_("unexpected response:"), d))
274 self._abort(error.ResponseError(_("unexpected response:"), d))
275
275
276 @batchable
276 @batchable
277 def known(self, nodes):
277 def known(self, nodes):
278 f = future()
278 f = future()
279 yield {'nodes': encodelist(nodes)}, f
279 yield {'nodes': encodelist(nodes)}, f
280 d = f.value
280 d = f.value
281 try:
281 try:
282 yield [bool(int(b)) for b in d]
282 yield [bool(int(b)) for b in d]
283 except ValueError:
283 except ValueError:
284 self._abort(error.ResponseError(_("unexpected response:"), d))
284 self._abort(error.ResponseError(_("unexpected response:"), d))
285
285
286 @batchable
286 @batchable
287 def branchmap(self):
287 def branchmap(self):
288 f = future()
288 f = future()
289 yield {}, f
289 yield {}, f
290 d = f.value
290 d = f.value
291 try:
291 try:
292 branchmap = {}
292 branchmap = {}
293 for branchpart in d.splitlines():
293 for branchpart in d.splitlines():
294 branchname, branchheads = branchpart.split(' ', 1)
294 branchname, branchheads = branchpart.split(' ', 1)
295 branchname = encoding.tolocal(urlreq.unquote(branchname))
295 branchname = encoding.tolocal(urlreq.unquote(branchname))
296 branchheads = decodelist(branchheads)
296 branchheads = decodelist(branchheads)
297 branchmap[branchname] = branchheads
297 branchmap[branchname] = branchheads
298 yield branchmap
298 yield branchmap
299 except TypeError:
299 except TypeError:
300 self._abort(error.ResponseError(_("unexpected response:"), d))
300 self._abort(error.ResponseError(_("unexpected response:"), d))
301
301
302 def branches(self, nodes):
302 def branches(self, nodes):
303 n = encodelist(nodes)
303 n = encodelist(nodes)
304 d = self._call("branches", nodes=n)
304 d = self._call("branches", nodes=n)
305 try:
305 try:
306 br = [tuple(decodelist(b)) for b in d.splitlines()]
306 br = [tuple(decodelist(b)) for b in d.splitlines()]
307 return br
307 return br
308 except ValueError:
308 except ValueError:
309 self._abort(error.ResponseError(_("unexpected response:"), d))
309 self._abort(error.ResponseError(_("unexpected response:"), d))
310
310
311 def between(self, pairs):
311 def between(self, pairs):
312 batch = 8 # avoid giant requests
312 batch = 8 # avoid giant requests
313 r = []
313 r = []
314 for i in xrange(0, len(pairs), batch):
314 for i in xrange(0, len(pairs), batch):
315 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
315 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
316 d = self._call("between", pairs=n)
316 d = self._call("between", pairs=n)
317 try:
317 try:
318 r.extend(l and decodelist(l) or [] for l in d.splitlines())
318 r.extend(l and decodelist(l) or [] for l in d.splitlines())
319 except ValueError:
319 except ValueError:
320 self._abort(error.ResponseError(_("unexpected response:"), d))
320 self._abort(error.ResponseError(_("unexpected response:"), d))
321 return r
321 return r
322
322
323 @batchable
323 @batchable
324 def pushkey(self, namespace, key, old, new):
324 def pushkey(self, namespace, key, old, new):
325 if not self.capable('pushkey'):
325 if not self.capable('pushkey'):
326 yield False, None
326 yield False, None
327 f = future()
327 f = future()
328 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
328 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
329 yield {'namespace': encoding.fromlocal(namespace),
329 yield {'namespace': encoding.fromlocal(namespace),
330 'key': encoding.fromlocal(key),
330 'key': encoding.fromlocal(key),
331 'old': encoding.fromlocal(old),
331 'old': encoding.fromlocal(old),
332 'new': encoding.fromlocal(new)}, f
332 'new': encoding.fromlocal(new)}, f
333 d = f.value
333 d = f.value
334 d, output = d.split('\n', 1)
334 d, output = d.split('\n', 1)
335 try:
335 try:
336 d = bool(int(d))
336 d = bool(int(d))
337 except ValueError:
337 except ValueError:
338 raise error.ResponseError(
338 raise error.ResponseError(
339 _('push failed (unexpected response):'), d)
339 _('push failed (unexpected response):'), d)
340 for l in output.splitlines(True):
340 for l in output.splitlines(True):
341 self.ui.status(_('remote: '), l)
341 self.ui.status(_('remote: '), l)
342 yield d
342 yield d
343
343
344 @batchable
344 @batchable
345 def listkeys(self, namespace):
345 def listkeys(self, namespace):
346 if not self.capable('pushkey'):
346 if not self.capable('pushkey'):
347 yield {}, None
347 yield {}, None
348 f = future()
348 f = future()
349 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
349 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
350 yield {'namespace': encoding.fromlocal(namespace)}, f
350 yield {'namespace': encoding.fromlocal(namespace)}, f
351 d = f.value
351 d = f.value
352 self.ui.debug('received listkey for "%s": %i bytes\n'
352 self.ui.debug('received listkey for "%s": %i bytes\n'
353 % (namespace, len(d)))
353 % (namespace, len(d)))
354 yield pushkeymod.decodekeys(d)
354 yield pushkeymod.decodekeys(d)
355
355
356 def stream_out(self):
356 def stream_out(self):
357 return self._callstream('stream_out')
357 return self._callstream('stream_out')
358
358
359 def changegroup(self, nodes, kind):
359 def changegroup(self, nodes, kind):
360 n = encodelist(nodes)
360 n = encodelist(nodes)
361 f = self._callcompressable("changegroup", roots=n)
361 f = self._callcompressable("changegroup", roots=n)
362 return changegroupmod.cg1unpacker(f, 'UN')
362 return changegroupmod.cg1unpacker(f, 'UN')
363
363
364 def changegroupsubset(self, bases, heads, kind):
364 def changegroupsubset(self, bases, heads, kind):
365 self.requirecap('changegroupsubset', _('look up remote changes'))
365 self.requirecap('changegroupsubset', _('look up remote changes'))
366 bases = encodelist(bases)
366 bases = encodelist(bases)
367 heads = encodelist(heads)
367 heads = encodelist(heads)
368 f = self._callcompressable("changegroupsubset",
368 f = self._callcompressable("changegroupsubset",
369 bases=bases, heads=heads)
369 bases=bases, heads=heads)
370 return changegroupmod.cg1unpacker(f, 'UN')
370 return changegroupmod.cg1unpacker(f, 'UN')
371
371
372 def getbundle(self, source, **kwargs):
372 def getbundle(self, source, **kwargs):
373 self.requirecap('getbundle', _('look up remote changes'))
373 self.requirecap('getbundle', _('look up remote changes'))
374 opts = {}
374 opts = {}
375 bundlecaps = kwargs.get('bundlecaps')
375 bundlecaps = kwargs.get('bundlecaps')
376 if bundlecaps is not None:
376 if bundlecaps is not None:
377 kwargs['bundlecaps'] = sorted(bundlecaps)
377 kwargs['bundlecaps'] = sorted(bundlecaps)
378 else:
378 else:
379 bundlecaps = () # kwargs could have it to None
379 bundlecaps = () # kwargs could have it to None
380 for key, value in kwargs.iteritems():
380 for key, value in kwargs.iteritems():
381 if value is None:
381 if value is None:
382 continue
382 continue
383 keytype = gboptsmap.get(key)
383 keytype = gboptsmap.get(key)
384 if keytype is None:
384 if keytype is None:
385 assert False, 'unexpected'
385 assert False, 'unexpected'
386 elif keytype == 'nodes':
386 elif keytype == 'nodes':
387 value = encodelist(value)
387 value = encodelist(value)
388 elif keytype in ('csv', 'scsv'):
388 elif keytype in ('csv', 'scsv'):
389 value = ','.join(value)
389 value = ','.join(value)
390 elif keytype == 'boolean':
390 elif keytype == 'boolean':
391 value = '%i' % bool(value)
391 value = '%i' % bool(value)
392 elif keytype != 'plain':
392 elif keytype != 'plain':
393 raise KeyError('unknown getbundle option type %s'
393 raise KeyError('unknown getbundle option type %s'
394 % keytype)
394 % keytype)
395 opts[key] = value
395 opts[key] = value
396 f = self._callcompressable("getbundle", **opts)
396 f = self._callcompressable("getbundle", **opts)
397 if any((cap.startswith('HG2') for cap in bundlecaps)):
397 if any((cap.startswith('HG2') for cap in bundlecaps)):
398 return bundle2.getunbundler(self.ui, f)
398 return bundle2.getunbundler(self.ui, f)
399 else:
399 else:
400 return changegroupmod.cg1unpacker(f, 'UN')
400 return changegroupmod.cg1unpacker(f, 'UN')
401
401
402 def unbundle(self, cg, heads, source):
402 def unbundle(self, cg, heads, url):
403 '''Send cg (a readable file-like object representing the
403 '''Send cg (a readable file-like object representing the
404 changegroup to push, typically a chunkbuffer object) to the
404 changegroup to push, typically a chunkbuffer object) to the
405 remote server as a bundle.
405 remote server as a bundle.
406
406
407 When pushing a bundle10 stream, return an integer indicating the
407 When pushing a bundle10 stream, return an integer indicating the
408 result of the push (see localrepository.addchangegroup()).
408 result of the push (see localrepository.addchangegroup()).
409
409
410 When pushing a bundle20 stream, return a bundle20 stream.'''
410 When pushing a bundle20 stream, return a bundle20 stream.
411
412 `url` is the url the client thinks it's pushing to, which is
413 visible to hooks.
414 '''
411
415
412 if heads != ['force'] and self.capable('unbundlehash'):
416 if heads != ['force'] and self.capable('unbundlehash'):
413 heads = encodelist(['hashed',
417 heads = encodelist(['hashed',
414 hashlib.sha1(''.join(sorted(heads))).digest()])
418 hashlib.sha1(''.join(sorted(heads))).digest()])
415 else:
419 else:
416 heads = encodelist(heads)
420 heads = encodelist(heads)
417
421
418 if util.safehasattr(cg, 'deltaheader'):
422 if util.safehasattr(cg, 'deltaheader'):
419 # this a bundle10, do the old style call sequence
423 # this a bundle10, do the old style call sequence
420 ret, output = self._callpush("unbundle", cg, heads=heads)
424 ret, output = self._callpush("unbundle", cg, heads=heads)
421 if ret == "":
425 if ret == "":
422 raise error.ResponseError(
426 raise error.ResponseError(
423 _('push failed:'), output)
427 _('push failed:'), output)
424 try:
428 try:
425 ret = int(ret)
429 ret = int(ret)
426 except ValueError:
430 except ValueError:
427 raise error.ResponseError(
431 raise error.ResponseError(
428 _('push failed (unexpected response):'), ret)
432 _('push failed (unexpected response):'), ret)
429
433
430 for l in output.splitlines(True):
434 for l in output.splitlines(True):
431 self.ui.status(_('remote: '), l)
435 self.ui.status(_('remote: '), l)
432 else:
436 else:
433 # bundle2 push. Send a stream, fetch a stream.
437 # bundle2 push. Send a stream, fetch a stream.
434 stream = self._calltwowaystream('unbundle', cg, heads=heads)
438 stream = self._calltwowaystream('unbundle', cg, heads=heads)
435 ret = bundle2.getunbundler(self.ui, stream)
439 ret = bundle2.getunbundler(self.ui, stream)
436 return ret
440 return ret
437
441
438 def debugwireargs(self, one, two, three=None, four=None, five=None):
442 def debugwireargs(self, one, two, three=None, four=None, five=None):
439 # don't pass optional arguments left at their default value
443 # don't pass optional arguments left at their default value
440 opts = {}
444 opts = {}
441 if three is not None:
445 if three is not None:
442 opts['three'] = three
446 opts['three'] = three
443 if four is not None:
447 if four is not None:
444 opts['four'] = four
448 opts['four'] = four
445 return self._call('debugwireargs', one=one, two=two, **opts)
449 return self._call('debugwireargs', one=one, two=two, **opts)
446
450
447 def _call(self, cmd, **args):
451 def _call(self, cmd, **args):
448 """execute <cmd> on the server
452 """execute <cmd> on the server
449
453
450 The command is expected to return a simple string.
454 The command is expected to return a simple string.
451
455
452 returns the server reply as a string."""
456 returns the server reply as a string."""
453 raise NotImplementedError()
457 raise NotImplementedError()
454
458
455 def _callstream(self, cmd, **args):
459 def _callstream(self, cmd, **args):
456 """execute <cmd> on the server
460 """execute <cmd> on the server
457
461
458 The command is expected to return a stream. Note that if the
462 The command is expected to return a stream. Note that if the
459 command doesn't return a stream, _callstream behaves
463 command doesn't return a stream, _callstream behaves
460 differently for ssh and http peers.
464 differently for ssh and http peers.
461
465
462 returns the server reply as a file like object.
466 returns the server reply as a file like object.
463 """
467 """
464 raise NotImplementedError()
468 raise NotImplementedError()
465
469
466 def _callcompressable(self, cmd, **args):
470 def _callcompressable(self, cmd, **args):
467 """execute <cmd> on the server
471 """execute <cmd> on the server
468
472
469 The command is expected to return a stream.
473 The command is expected to return a stream.
470
474
471 The stream may have been compressed in some implementations. This
475 The stream may have been compressed in some implementations. This
472 function takes care of the decompression. This is the only difference
476 function takes care of the decompression. This is the only difference
473 with _callstream.
477 with _callstream.
474
478
475 returns the server reply as a file like object.
479 returns the server reply as a file like object.
476 """
480 """
477 raise NotImplementedError()
481 raise NotImplementedError()
478
482
479 def _callpush(self, cmd, fp, **args):
483 def _callpush(self, cmd, fp, **args):
480 """execute a <cmd> on server
484 """execute a <cmd> on server
481
485
482 The command is expected to be related to a push. Push has a special
486 The command is expected to be related to a push. Push has a special
483 return method.
487 return method.
484
488
485 returns the server reply as a (ret, output) tuple. ret is either
489 returns the server reply as a (ret, output) tuple. ret is either
486 empty (error) or a stringified int.
490 empty (error) or a stringified int.
487 """
491 """
488 raise NotImplementedError()
492 raise NotImplementedError()
489
493
490 def _calltwowaystream(self, cmd, fp, **args):
494 def _calltwowaystream(self, cmd, fp, **args):
491 """execute <cmd> on server
495 """execute <cmd> on server
492
496
493 The command will send a stream to the server and get a stream in reply.
497 The command will send a stream to the server and get a stream in reply.
494 """
498 """
495 raise NotImplementedError()
499 raise NotImplementedError()
496
500
497 def _abort(self, exception):
501 def _abort(self, exception):
498 """clearly abort the wire protocol connection and raise the exception
502 """clearly abort the wire protocol connection and raise the exception
499 """
503 """
500 raise NotImplementedError()
504 raise NotImplementedError()
501
505
502 # server side
506 # server side
503
507
504 # wire protocol command can either return a string or one of these classes.
508 # wire protocol command can either return a string or one of these classes.
505 class streamres(object):
509 class streamres(object):
506 """wireproto reply: binary stream
510 """wireproto reply: binary stream
507
511
508 The call was successful and the result is a stream.
512 The call was successful and the result is a stream.
509 Iterate on the `self.gen` attribute to retrieve chunks.
513 Iterate on the `self.gen` attribute to retrieve chunks.
510 """
514 """
511 def __init__(self, gen):
515 def __init__(self, gen):
512 self.gen = gen
516 self.gen = gen
513
517
514 class pushres(object):
518 class pushres(object):
515 """wireproto reply: success with simple integer return
519 """wireproto reply: success with simple integer return
516
520
517 The call was successful and returned an integer contained in `self.res`.
521 The call was successful and returned an integer contained in `self.res`.
518 """
522 """
519 def __init__(self, res):
523 def __init__(self, res):
520 self.res = res
524 self.res = res
521
525
522 class pusherr(object):
526 class pusherr(object):
523 """wireproto reply: failure
527 """wireproto reply: failure
524
528
525 The call failed. The `self.res` attribute contains the error message.
529 The call failed. The `self.res` attribute contains the error message.
526 """
530 """
527 def __init__(self, res):
531 def __init__(self, res):
528 self.res = res
532 self.res = res
529
533
530 class ooberror(object):
534 class ooberror(object):
531 """wireproto reply: failure of a batch of operation
535 """wireproto reply: failure of a batch of operation
532
536
533 Something failed during a batch call. The error message is stored in
537 Something failed during a batch call. The error message is stored in
534 `self.message`.
538 `self.message`.
535 """
539 """
536 def __init__(self, message):
540 def __init__(self, message):
537 self.message = message
541 self.message = message
538
542
539 def getdispatchrepo(repo, proto, command):
543 def getdispatchrepo(repo, proto, command):
540 """Obtain the repo used for processing wire protocol commands.
544 """Obtain the repo used for processing wire protocol commands.
541
545
542 The intent of this function is to serve as a monkeypatch point for
546 The intent of this function is to serve as a monkeypatch point for
543 extensions that need commands to operate on different repo views under
547 extensions that need commands to operate on different repo views under
544 specialized circumstances.
548 specialized circumstances.
545 """
549 """
546 return repo.filtered('served')
550 return repo.filtered('served')
547
551
548 def dispatch(repo, proto, command):
552 def dispatch(repo, proto, command):
549 repo = getdispatchrepo(repo, proto, command)
553 repo = getdispatchrepo(repo, proto, command)
550 func, spec = commands[command]
554 func, spec = commands[command]
551 args = proto.getargs(spec)
555 args = proto.getargs(spec)
552 return func(repo, proto, *args)
556 return func(repo, proto, *args)
553
557
554 def options(cmd, keys, others):
558 def options(cmd, keys, others):
555 opts = {}
559 opts = {}
556 for k in keys:
560 for k in keys:
557 if k in others:
561 if k in others:
558 opts[k] = others[k]
562 opts[k] = others[k]
559 del others[k]
563 del others[k]
560 if others:
564 if others:
561 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
565 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
562 % (cmd, ",".join(others)))
566 % (cmd, ",".join(others)))
563 return opts
567 return opts
564
568
565 def bundle1allowed(repo, action):
569 def bundle1allowed(repo, action):
566 """Whether a bundle1 operation is allowed from the server.
570 """Whether a bundle1 operation is allowed from the server.
567
571
568 Priority is:
572 Priority is:
569
573
570 1. server.bundle1gd.<action> (if generaldelta active)
574 1. server.bundle1gd.<action> (if generaldelta active)
571 2. server.bundle1.<action>
575 2. server.bundle1.<action>
572 3. server.bundle1gd (if generaldelta active)
576 3. server.bundle1gd (if generaldelta active)
573 4. server.bundle1
577 4. server.bundle1
574 """
578 """
575 ui = repo.ui
579 ui = repo.ui
576 gd = 'generaldelta' in repo.requirements
580 gd = 'generaldelta' in repo.requirements
577
581
578 if gd:
582 if gd:
579 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
583 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
580 if v is not None:
584 if v is not None:
581 return v
585 return v
582
586
583 v = ui.configbool('server', 'bundle1.%s' % action, None)
587 v = ui.configbool('server', 'bundle1.%s' % action, None)
584 if v is not None:
588 if v is not None:
585 return v
589 return v
586
590
587 if gd:
591 if gd:
588 v = ui.configbool('server', 'bundle1gd', None)
592 v = ui.configbool('server', 'bundle1gd', None)
589 if v is not None:
593 if v is not None:
590 return v
594 return v
591
595
592 return ui.configbool('server', 'bundle1', True)
596 return ui.configbool('server', 'bundle1', True)
593
597
594 # list of commands
598 # list of commands
595 commands = {}
599 commands = {}
596
600
597 def wireprotocommand(name, args=''):
601 def wireprotocommand(name, args=''):
598 """decorator for wire protocol command"""
602 """decorator for wire protocol command"""
599 def register(func):
603 def register(func):
600 commands[name] = (func, args)
604 commands[name] = (func, args)
601 return func
605 return func
602 return register
606 return register
603
607
604 @wireprotocommand('batch', 'cmds *')
608 @wireprotocommand('batch', 'cmds *')
605 def batch(repo, proto, cmds, others):
609 def batch(repo, proto, cmds, others):
606 repo = repo.filtered("served")
610 repo = repo.filtered("served")
607 res = []
611 res = []
608 for pair in cmds.split(';'):
612 for pair in cmds.split(';'):
609 op, args = pair.split(' ', 1)
613 op, args = pair.split(' ', 1)
610 vals = {}
614 vals = {}
611 for a in args.split(','):
615 for a in args.split(','):
612 if a:
616 if a:
613 n, v = a.split('=')
617 n, v = a.split('=')
614 vals[n] = unescapearg(v)
618 vals[n] = unescapearg(v)
615 func, spec = commands[op]
619 func, spec = commands[op]
616 if spec:
620 if spec:
617 keys = spec.split()
621 keys = spec.split()
618 data = {}
622 data = {}
619 for k in keys:
623 for k in keys:
620 if k == '*':
624 if k == '*':
621 star = {}
625 star = {}
622 for key in vals.keys():
626 for key in vals.keys():
623 if key not in keys:
627 if key not in keys:
624 star[key] = vals[key]
628 star[key] = vals[key]
625 data['*'] = star
629 data['*'] = star
626 else:
630 else:
627 data[k] = vals[k]
631 data[k] = vals[k]
628 result = func(repo, proto, *[data[k] for k in keys])
632 result = func(repo, proto, *[data[k] for k in keys])
629 else:
633 else:
630 result = func(repo, proto)
634 result = func(repo, proto)
631 if isinstance(result, ooberror):
635 if isinstance(result, ooberror):
632 return result
636 return result
633 res.append(escapearg(result))
637 res.append(escapearg(result))
634 return ';'.join(res)
638 return ';'.join(res)
635
639
636 @wireprotocommand('between', 'pairs')
640 @wireprotocommand('between', 'pairs')
637 def between(repo, proto, pairs):
641 def between(repo, proto, pairs):
638 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
642 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
639 r = []
643 r = []
640 for b in repo.between(pairs):
644 for b in repo.between(pairs):
641 r.append(encodelist(b) + "\n")
645 r.append(encodelist(b) + "\n")
642 return "".join(r)
646 return "".join(r)
643
647
644 @wireprotocommand('branchmap')
648 @wireprotocommand('branchmap')
645 def branchmap(repo, proto):
649 def branchmap(repo, proto):
646 branchmap = repo.branchmap()
650 branchmap = repo.branchmap()
647 heads = []
651 heads = []
648 for branch, nodes in branchmap.iteritems():
652 for branch, nodes in branchmap.iteritems():
649 branchname = urlreq.quote(encoding.fromlocal(branch))
653 branchname = urlreq.quote(encoding.fromlocal(branch))
650 branchnodes = encodelist(nodes)
654 branchnodes = encodelist(nodes)
651 heads.append('%s %s' % (branchname, branchnodes))
655 heads.append('%s %s' % (branchname, branchnodes))
652 return '\n'.join(heads)
656 return '\n'.join(heads)
653
657
654 @wireprotocommand('branches', 'nodes')
658 @wireprotocommand('branches', 'nodes')
655 def branches(repo, proto, nodes):
659 def branches(repo, proto, nodes):
656 nodes = decodelist(nodes)
660 nodes = decodelist(nodes)
657 r = []
661 r = []
658 for b in repo.branches(nodes):
662 for b in repo.branches(nodes):
659 r.append(encodelist(b) + "\n")
663 r.append(encodelist(b) + "\n")
660 return "".join(r)
664 return "".join(r)
661
665
662 @wireprotocommand('clonebundles', '')
666 @wireprotocommand('clonebundles', '')
663 def clonebundles(repo, proto):
667 def clonebundles(repo, proto):
664 """Server command for returning info for available bundles to seed clones.
668 """Server command for returning info for available bundles to seed clones.
665
669
666 Clients will parse this response and determine what bundle to fetch.
670 Clients will parse this response and determine what bundle to fetch.
667
671
668 Extensions may wrap this command to filter or dynamically emit data
672 Extensions may wrap this command to filter or dynamically emit data
669 depending on the request. e.g. you could advertise URLs for the closest
673 depending on the request. e.g. you could advertise URLs for the closest
670 data center given the client's IP address.
674 data center given the client's IP address.
671 """
675 """
672 return repo.opener.tryread('clonebundles.manifest')
676 return repo.opener.tryread('clonebundles.manifest')
673
677
674 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
678 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
675 'known', 'getbundle', 'unbundlehash', 'batch']
679 'known', 'getbundle', 'unbundlehash', 'batch']
676
680
677 def _capabilities(repo, proto):
681 def _capabilities(repo, proto):
678 """return a list of capabilities for a repo
682 """return a list of capabilities for a repo
679
683
680 This function exists to allow extensions to easily wrap capabilities
684 This function exists to allow extensions to easily wrap capabilities
681 computation
685 computation
682
686
683 - returns a lists: easy to alter
687 - returns a lists: easy to alter
684 - change done here will be propagated to both `capabilities` and `hello`
688 - change done here will be propagated to both `capabilities` and `hello`
685 command without any other action needed.
689 command without any other action needed.
686 """
690 """
687 # copy to prevent modification of the global list
691 # copy to prevent modification of the global list
688 caps = list(wireprotocaps)
692 caps = list(wireprotocaps)
689 if streamclone.allowservergeneration(repo.ui):
693 if streamclone.allowservergeneration(repo.ui):
690 if repo.ui.configbool('server', 'preferuncompressed', False):
694 if repo.ui.configbool('server', 'preferuncompressed', False):
691 caps.append('stream-preferred')
695 caps.append('stream-preferred')
692 requiredformats = repo.requirements & repo.supportedformats
696 requiredformats = repo.requirements & repo.supportedformats
693 # if our local revlogs are just revlogv1, add 'stream' cap
697 # if our local revlogs are just revlogv1, add 'stream' cap
694 if not requiredformats - set(('revlogv1',)):
698 if not requiredformats - set(('revlogv1',)):
695 caps.append('stream')
699 caps.append('stream')
696 # otherwise, add 'streamreqs' detailing our local revlog format
700 # otherwise, add 'streamreqs' detailing our local revlog format
697 else:
701 else:
698 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
702 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
699 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
703 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
700 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
704 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
701 caps.append('bundle2=' + urlreq.quote(capsblob))
705 caps.append('bundle2=' + urlreq.quote(capsblob))
702 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
706 caps.append('unbundle=%s' % ','.join(bundle2.bundlepriority))
703 caps.append(
707 caps.append(
704 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
708 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
705 if repo.ui.configbool('experimental', 'httppostargs', False):
709 if repo.ui.configbool('experimental', 'httppostargs', False):
706 caps.append('httppostargs')
710 caps.append('httppostargs')
707 return caps
711 return caps
708
712
709 # If you are writing an extension and consider wrapping this function. Wrap
713 # If you are writing an extension and consider wrapping this function. Wrap
710 # `_capabilities` instead.
714 # `_capabilities` instead.
711 @wireprotocommand('capabilities')
715 @wireprotocommand('capabilities')
712 def capabilities(repo, proto):
716 def capabilities(repo, proto):
713 return ' '.join(_capabilities(repo, proto))
717 return ' '.join(_capabilities(repo, proto))
714
718
715 @wireprotocommand('changegroup', 'roots')
719 @wireprotocommand('changegroup', 'roots')
716 def changegroup(repo, proto, roots):
720 def changegroup(repo, proto, roots):
717 nodes = decodelist(roots)
721 nodes = decodelist(roots)
718 cg = changegroupmod.changegroup(repo, nodes, 'serve')
722 cg = changegroupmod.changegroup(repo, nodes, 'serve')
719 return streamres(proto.groupchunks(cg))
723 return streamres(proto.groupchunks(cg))
720
724
721 @wireprotocommand('changegroupsubset', 'bases heads')
725 @wireprotocommand('changegroupsubset', 'bases heads')
722 def changegroupsubset(repo, proto, bases, heads):
726 def changegroupsubset(repo, proto, bases, heads):
723 bases = decodelist(bases)
727 bases = decodelist(bases)
724 heads = decodelist(heads)
728 heads = decodelist(heads)
725 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
729 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
726 return streamres(proto.groupchunks(cg))
730 return streamres(proto.groupchunks(cg))
727
731
728 @wireprotocommand('debugwireargs', 'one two *')
732 @wireprotocommand('debugwireargs', 'one two *')
729 def debugwireargs(repo, proto, one, two, others):
733 def debugwireargs(repo, proto, one, two, others):
730 # only accept optional args from the known set
734 # only accept optional args from the known set
731 opts = options('debugwireargs', ['three', 'four'], others)
735 opts = options('debugwireargs', ['three', 'four'], others)
732 return repo.debugwireargs(one, two, **opts)
736 return repo.debugwireargs(one, two, **opts)
733
737
734 # List of options accepted by getbundle.
738 # List of options accepted by getbundle.
735 #
739 #
736 # Meant to be extended by extensions. It is the extension's responsibility to
740 # Meant to be extended by extensions. It is the extension's responsibility to
737 # ensure such options are properly processed in exchange.getbundle.
741 # ensure such options are properly processed in exchange.getbundle.
738 gboptslist = ['heads', 'common', 'bundlecaps']
742 gboptslist = ['heads', 'common', 'bundlecaps']
739
743
740 @wireprotocommand('getbundle', '*')
744 @wireprotocommand('getbundle', '*')
741 def getbundle(repo, proto, others):
745 def getbundle(repo, proto, others):
742 opts = options('getbundle', gboptsmap.keys(), others)
746 opts = options('getbundle', gboptsmap.keys(), others)
743 for k, v in opts.iteritems():
747 for k, v in opts.iteritems():
744 keytype = gboptsmap[k]
748 keytype = gboptsmap[k]
745 if keytype == 'nodes':
749 if keytype == 'nodes':
746 opts[k] = decodelist(v)
750 opts[k] = decodelist(v)
747 elif keytype == 'csv':
751 elif keytype == 'csv':
748 opts[k] = list(v.split(','))
752 opts[k] = list(v.split(','))
749 elif keytype == 'scsv':
753 elif keytype == 'scsv':
750 opts[k] = set(v.split(','))
754 opts[k] = set(v.split(','))
751 elif keytype == 'boolean':
755 elif keytype == 'boolean':
752 # Client should serialize False as '0', which is a non-empty string
756 # Client should serialize False as '0', which is a non-empty string
753 # so it evaluates as a True bool.
757 # so it evaluates as a True bool.
754 if v == '0':
758 if v == '0':
755 opts[k] = False
759 opts[k] = False
756 else:
760 else:
757 opts[k] = bool(v)
761 opts[k] = bool(v)
758 elif keytype != 'plain':
762 elif keytype != 'plain':
759 raise KeyError('unknown getbundle option type %s'
763 raise KeyError('unknown getbundle option type %s'
760 % keytype)
764 % keytype)
761
765
762 if not bundle1allowed(repo, 'pull'):
766 if not bundle1allowed(repo, 'pull'):
763 if not exchange.bundle2requested(opts.get('bundlecaps')):
767 if not exchange.bundle2requested(opts.get('bundlecaps')):
764 return ooberror(bundle2required)
768 return ooberror(bundle2required)
765
769
766 cg = exchange.getbundle(repo, 'serve', **opts)
770 cg = exchange.getbundle(repo, 'serve', **opts)
767 return streamres(proto.groupchunks(cg))
771 return streamres(proto.groupchunks(cg))
768
772
769 @wireprotocommand('heads')
773 @wireprotocommand('heads')
770 def heads(repo, proto):
774 def heads(repo, proto):
771 h = repo.heads()
775 h = repo.heads()
772 return encodelist(h) + "\n"
776 return encodelist(h) + "\n"
773
777
774 @wireprotocommand('hello')
778 @wireprotocommand('hello')
775 def hello(repo, proto):
779 def hello(repo, proto):
776 '''the hello command returns a set of lines describing various
780 '''the hello command returns a set of lines describing various
777 interesting things about the server, in an RFC822-like format.
781 interesting things about the server, in an RFC822-like format.
778 Currently the only one defined is "capabilities", which
782 Currently the only one defined is "capabilities", which
779 consists of a line in the form:
783 consists of a line in the form:
780
784
781 capabilities: space separated list of tokens
785 capabilities: space separated list of tokens
782 '''
786 '''
783 return "capabilities: %s\n" % (capabilities(repo, proto))
787 return "capabilities: %s\n" % (capabilities(repo, proto))
784
788
785 @wireprotocommand('listkeys', 'namespace')
789 @wireprotocommand('listkeys', 'namespace')
786 def listkeys(repo, proto, namespace):
790 def listkeys(repo, proto, namespace):
787 d = repo.listkeys(encoding.tolocal(namespace)).items()
791 d = repo.listkeys(encoding.tolocal(namespace)).items()
788 return pushkeymod.encodekeys(d)
792 return pushkeymod.encodekeys(d)
789
793
790 @wireprotocommand('lookup', 'key')
794 @wireprotocommand('lookup', 'key')
791 def lookup(repo, proto, key):
795 def lookup(repo, proto, key):
792 try:
796 try:
793 k = encoding.tolocal(key)
797 k = encoding.tolocal(key)
794 c = repo[k]
798 c = repo[k]
795 r = c.hex()
799 r = c.hex()
796 success = 1
800 success = 1
797 except Exception as inst:
801 except Exception as inst:
798 r = str(inst)
802 r = str(inst)
799 success = 0
803 success = 0
800 return "%s %s\n" % (success, r)
804 return "%s %s\n" % (success, r)
801
805
802 @wireprotocommand('known', 'nodes *')
806 @wireprotocommand('known', 'nodes *')
803 def known(repo, proto, nodes, others):
807 def known(repo, proto, nodes, others):
804 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
808 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
805
809
806 @wireprotocommand('pushkey', 'namespace key old new')
810 @wireprotocommand('pushkey', 'namespace key old new')
807 def pushkey(repo, proto, namespace, key, old, new):
811 def pushkey(repo, proto, namespace, key, old, new):
808 # compatibility with pre-1.8 clients which were accidentally
812 # compatibility with pre-1.8 clients which were accidentally
809 # sending raw binary nodes rather than utf-8-encoded hex
813 # sending raw binary nodes rather than utf-8-encoded hex
810 if len(new) == 20 and new.encode('string-escape') != new:
814 if len(new) == 20 and new.encode('string-escape') != new:
811 # looks like it could be a binary node
815 # looks like it could be a binary node
812 try:
816 try:
813 new.decode('utf-8')
817 new.decode('utf-8')
814 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
818 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
815 except UnicodeDecodeError:
819 except UnicodeDecodeError:
816 pass # binary, leave unmodified
820 pass # binary, leave unmodified
817 else:
821 else:
818 new = encoding.tolocal(new) # normal path
822 new = encoding.tolocal(new) # normal path
819
823
820 if util.safehasattr(proto, 'restore'):
824 if util.safehasattr(proto, 'restore'):
821
825
822 proto.redirect()
826 proto.redirect()
823
827
824 try:
828 try:
825 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
829 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
826 encoding.tolocal(old), new) or False
830 encoding.tolocal(old), new) or False
827 except error.Abort:
831 except error.Abort:
828 r = False
832 r = False
829
833
830 output = proto.restore()
834 output = proto.restore()
831
835
832 return '%s\n%s' % (int(r), output)
836 return '%s\n%s' % (int(r), output)
833
837
834 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
838 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
835 encoding.tolocal(old), new)
839 encoding.tolocal(old), new)
836 return '%s\n' % int(r)
840 return '%s\n' % int(r)
837
841
838 @wireprotocommand('stream_out')
842 @wireprotocommand('stream_out')
839 def stream(repo, proto):
843 def stream(repo, proto):
840 '''If the server supports streaming clone, it advertises the "stream"
844 '''If the server supports streaming clone, it advertises the "stream"
841 capability with a value representing the version and flags of the repo
845 capability with a value representing the version and flags of the repo
842 it is serving. Client checks to see if it understands the format.
846 it is serving. Client checks to see if it understands the format.
843 '''
847 '''
844 if not streamclone.allowservergeneration(repo.ui):
848 if not streamclone.allowservergeneration(repo.ui):
845 return '1\n'
849 return '1\n'
846
850
847 def getstream(it):
851 def getstream(it):
848 yield '0\n'
852 yield '0\n'
849 for chunk in it:
853 for chunk in it:
850 yield chunk
854 yield chunk
851
855
852 try:
856 try:
853 # LockError may be raised before the first result is yielded. Don't
857 # LockError may be raised before the first result is yielded. Don't
854 # emit output until we're sure we got the lock successfully.
858 # emit output until we're sure we got the lock successfully.
855 it = streamclone.generatev1wireproto(repo)
859 it = streamclone.generatev1wireproto(repo)
856 return streamres(getstream(it))
860 return streamres(getstream(it))
857 except error.LockError:
861 except error.LockError:
858 return '2\n'
862 return '2\n'
859
863
860 @wireprotocommand('unbundle', 'heads')
864 @wireprotocommand('unbundle', 'heads')
861 def unbundle(repo, proto, heads):
865 def unbundle(repo, proto, heads):
862 their_heads = decodelist(heads)
866 their_heads = decodelist(heads)
863
867
864 try:
868 try:
865 proto.redirect()
869 proto.redirect()
866
870
867 exchange.check_heads(repo, their_heads, 'preparing changes')
871 exchange.check_heads(repo, their_heads, 'preparing changes')
868
872
869 # write bundle data to temporary file because it can be big
873 # write bundle data to temporary file because it can be big
870 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
874 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
871 fp = os.fdopen(fd, 'wb+')
875 fp = os.fdopen(fd, 'wb+')
872 r = 0
876 r = 0
873 try:
877 try:
874 proto.getfile(fp)
878 proto.getfile(fp)
875 fp.seek(0)
879 fp.seek(0)
876 gen = exchange.readbundle(repo.ui, fp, None)
880 gen = exchange.readbundle(repo.ui, fp, None)
877 if (isinstance(gen, changegroupmod.cg1unpacker)
881 if (isinstance(gen, changegroupmod.cg1unpacker)
878 and not bundle1allowed(repo, 'push')):
882 and not bundle1allowed(repo, 'push')):
879 return ooberror(bundle2required)
883 return ooberror(bundle2required)
880
884
881 r = exchange.unbundle(repo, gen, their_heads, 'serve',
885 r = exchange.unbundle(repo, gen, their_heads, 'serve',
882 proto._client())
886 proto._client())
883 if util.safehasattr(r, 'addpart'):
887 if util.safehasattr(r, 'addpart'):
884 # The return looks streamable, we are in the bundle2 case and
888 # The return looks streamable, we are in the bundle2 case and
885 # should return a stream.
889 # should return a stream.
886 return streamres(r.getchunks())
890 return streamres(r.getchunks())
887 return pushres(r)
891 return pushres(r)
888
892
889 finally:
893 finally:
890 fp.close()
894 fp.close()
891 os.unlink(tempname)
895 os.unlink(tempname)
892
896
893 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
897 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
894 # handle non-bundle2 case first
898 # handle non-bundle2 case first
895 if not getattr(exc, 'duringunbundle2', False):
899 if not getattr(exc, 'duringunbundle2', False):
896 try:
900 try:
897 raise
901 raise
898 except error.Abort:
902 except error.Abort:
899 # The old code we moved used sys.stderr directly.
903 # The old code we moved used sys.stderr directly.
900 # We did not change it to minimise code change.
904 # We did not change it to minimise code change.
901 # This need to be moved to something proper.
905 # This need to be moved to something proper.
902 # Feel free to do it.
906 # Feel free to do it.
903 sys.stderr.write("abort: %s\n" % exc)
907 sys.stderr.write("abort: %s\n" % exc)
904 return pushres(0)
908 return pushres(0)
905 except error.PushRaced:
909 except error.PushRaced:
906 return pusherr(str(exc))
910 return pusherr(str(exc))
907
911
908 bundler = bundle2.bundle20(repo.ui)
912 bundler = bundle2.bundle20(repo.ui)
909 for out in getattr(exc, '_bundle2salvagedoutput', ()):
913 for out in getattr(exc, '_bundle2salvagedoutput', ()):
910 bundler.addpart(out)
914 bundler.addpart(out)
911 try:
915 try:
912 try:
916 try:
913 raise
917 raise
914 except error.PushkeyFailed as exc:
918 except error.PushkeyFailed as exc:
915 # check client caps
919 # check client caps
916 remotecaps = getattr(exc, '_replycaps', None)
920 remotecaps = getattr(exc, '_replycaps', None)
917 if (remotecaps is not None
921 if (remotecaps is not None
918 and 'pushkey' not in remotecaps.get('error', ())):
922 and 'pushkey' not in remotecaps.get('error', ())):
919 # no support remote side, fallback to Abort handler.
923 # no support remote side, fallback to Abort handler.
920 raise
924 raise
921 part = bundler.newpart('error:pushkey')
925 part = bundler.newpart('error:pushkey')
922 part.addparam('in-reply-to', exc.partid)
926 part.addparam('in-reply-to', exc.partid)
923 if exc.namespace is not None:
927 if exc.namespace is not None:
924 part.addparam('namespace', exc.namespace, mandatory=False)
928 part.addparam('namespace', exc.namespace, mandatory=False)
925 if exc.key is not None:
929 if exc.key is not None:
926 part.addparam('key', exc.key, mandatory=False)
930 part.addparam('key', exc.key, mandatory=False)
927 if exc.new is not None:
931 if exc.new is not None:
928 part.addparam('new', exc.new, mandatory=False)
932 part.addparam('new', exc.new, mandatory=False)
929 if exc.old is not None:
933 if exc.old is not None:
930 part.addparam('old', exc.old, mandatory=False)
934 part.addparam('old', exc.old, mandatory=False)
931 if exc.ret is not None:
935 if exc.ret is not None:
932 part.addparam('ret', exc.ret, mandatory=False)
936 part.addparam('ret', exc.ret, mandatory=False)
933 except error.BundleValueError as exc:
937 except error.BundleValueError as exc:
934 errpart = bundler.newpart('error:unsupportedcontent')
938 errpart = bundler.newpart('error:unsupportedcontent')
935 if exc.parttype is not None:
939 if exc.parttype is not None:
936 errpart.addparam('parttype', exc.parttype)
940 errpart.addparam('parttype', exc.parttype)
937 if exc.params:
941 if exc.params:
938 errpart.addparam('params', '\0'.join(exc.params))
942 errpart.addparam('params', '\0'.join(exc.params))
939 except error.Abort as exc:
943 except error.Abort as exc:
940 manargs = [('message', str(exc))]
944 manargs = [('message', str(exc))]
941 advargs = []
945 advargs = []
942 if exc.hint is not None:
946 if exc.hint is not None:
943 advargs.append(('hint', exc.hint))
947 advargs.append(('hint', exc.hint))
944 bundler.addpart(bundle2.bundlepart('error:abort',
948 bundler.addpart(bundle2.bundlepart('error:abort',
945 manargs, advargs))
949 manargs, advargs))
946 except error.PushRaced as exc:
950 except error.PushRaced as exc:
947 bundler.newpart('error:pushraced', [('message', str(exc))])
951 bundler.newpart('error:pushraced', [('message', str(exc))])
948 return streamres(bundler.getchunks())
952 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now