##// END OF EJS Templates
wireproto: document quirk of _callstream between http and ssh...
Augie Fackler -
r28435:176736af default
parent child Browse files
Show More
@@ -1,875 +1,878 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import os
10 import os
11 import sys
11 import sys
12 import tempfile
12 import tempfile
13 import urllib
13 import urllib
14
14
15 from .i18n import _
15 from .i18n import _
16 from .node import (
16 from .node import (
17 bin,
17 bin,
18 hex,
18 hex,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 encoding,
24 encoding,
25 error,
25 error,
26 exchange,
26 exchange,
27 peer,
27 peer,
28 pushkey as pushkeymod,
28 pushkey as pushkeymod,
29 streamclone,
29 streamclone,
30 util,
30 util,
31 )
31 )
32
32
33 bundle2required = _(
33 bundle2required = _(
34 'incompatible Mercurial client; bundle2 required\n'
34 'incompatible Mercurial client; bundle2 required\n'
35 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
35 '(see https://www.mercurial-scm.org/wiki/IncompatibleClient)\n')
36
36
37 class abstractserverproto(object):
37 class abstractserverproto(object):
38 """abstract class that summarizes the protocol API
38 """abstract class that summarizes the protocol API
39
39
40 Used as reference and documentation.
40 Used as reference and documentation.
41 """
41 """
42
42
43 def getargs(self, args):
43 def getargs(self, args):
44 """return the value for arguments in <args>
44 """return the value for arguments in <args>
45
45
46 returns a list of values (same order as <args>)"""
46 returns a list of values (same order as <args>)"""
47 raise NotImplementedError()
47 raise NotImplementedError()
48
48
49 def getfile(self, fp):
49 def getfile(self, fp):
50 """write the whole content of a file into a file like object
50 """write the whole content of a file into a file like object
51
51
52 The file is in the form::
52 The file is in the form::
53
53
54 (<chunk-size>\n<chunk>)+0\n
54 (<chunk-size>\n<chunk>)+0\n
55
55
56 chunk size is the ascii version of the int.
56 chunk size is the ascii version of the int.
57 """
57 """
58 raise NotImplementedError()
58 raise NotImplementedError()
59
59
60 def redirect(self):
60 def redirect(self):
61 """may setup interception for stdout and stderr
61 """may setup interception for stdout and stderr
62
62
63 See also the `restore` method."""
63 See also the `restore` method."""
64 raise NotImplementedError()
64 raise NotImplementedError()
65
65
66 # If the `redirect` function does install interception, the `restore`
66 # If the `redirect` function does install interception, the `restore`
67 # function MUST be defined. If interception is not used, this function
67 # function MUST be defined. If interception is not used, this function
68 # MUST NOT be defined.
68 # MUST NOT be defined.
69 #
69 #
70 # left commented here on purpose
70 # left commented here on purpose
71 #
71 #
72 #def restore(self):
72 #def restore(self):
73 # """reinstall previous stdout and stderr and return intercepted stdout
73 # """reinstall previous stdout and stderr and return intercepted stdout
74 # """
74 # """
75 # raise NotImplementedError()
75 # raise NotImplementedError()
76
76
77 def groupchunks(self, cg):
77 def groupchunks(self, cg):
78 """return 4096 chunks from a changegroup object
78 """return 4096 chunks from a changegroup object
79
79
80 Some protocols may have compressed the contents."""
80 Some protocols may have compressed the contents."""
81 raise NotImplementedError()
81 raise NotImplementedError()
82
82
83 class remotebatch(peer.batcher):
83 class remotebatch(peer.batcher):
84 '''batches the queued calls; uses as few roundtrips as possible'''
84 '''batches the queued calls; uses as few roundtrips as possible'''
85 def __init__(self, remote):
85 def __init__(self, remote):
86 '''remote must support _submitbatch(encbatch) and
86 '''remote must support _submitbatch(encbatch) and
87 _submitone(op, encargs)'''
87 _submitone(op, encargs)'''
88 peer.batcher.__init__(self)
88 peer.batcher.__init__(self)
89 self.remote = remote
89 self.remote = remote
90 def submit(self):
90 def submit(self):
91 req, rsp = [], []
91 req, rsp = [], []
92 for name, args, opts, resref in self.calls:
92 for name, args, opts, resref in self.calls:
93 mtd = getattr(self.remote, name)
93 mtd = getattr(self.remote, name)
94 batchablefn = getattr(mtd, 'batchable', None)
94 batchablefn = getattr(mtd, 'batchable', None)
95 if batchablefn is not None:
95 if batchablefn is not None:
96 batchable = batchablefn(mtd.im_self, *args, **opts)
96 batchable = batchablefn(mtd.im_self, *args, **opts)
97 encargsorres, encresref = batchable.next()
97 encargsorres, encresref = batchable.next()
98 if encresref:
98 if encresref:
99 req.append((name, encargsorres,))
99 req.append((name, encargsorres,))
100 rsp.append((batchable, encresref, resref,))
100 rsp.append((batchable, encresref, resref,))
101 else:
101 else:
102 resref.set(encargsorres)
102 resref.set(encargsorres)
103 else:
103 else:
104 if req:
104 if req:
105 self._submitreq(req, rsp)
105 self._submitreq(req, rsp)
106 req, rsp = [], []
106 req, rsp = [], []
107 resref.set(mtd(*args, **opts))
107 resref.set(mtd(*args, **opts))
108 if req:
108 if req:
109 self._submitreq(req, rsp)
109 self._submitreq(req, rsp)
110 def _submitreq(self, req, rsp):
110 def _submitreq(self, req, rsp):
111 encresults = self.remote._submitbatch(req)
111 encresults = self.remote._submitbatch(req)
112 for encres, r in zip(encresults, rsp):
112 for encres, r in zip(encresults, rsp):
113 batchable, encresref, resref = r
113 batchable, encresref, resref = r
114 encresref.set(encres)
114 encresref.set(encres)
115 resref.set(batchable.next())
115 resref.set(batchable.next())
116
116
117 # Forward a couple of names from peer to make wireproto interactions
117 # Forward a couple of names from peer to make wireproto interactions
118 # slightly more sensible.
118 # slightly more sensible.
119 batchable = peer.batchable
119 batchable = peer.batchable
120 future = peer.future
120 future = peer.future
121
121
122 # list of nodes encoding / decoding
122 # list of nodes encoding / decoding
123
123
124 def decodelist(l, sep=' '):
124 def decodelist(l, sep=' '):
125 if l:
125 if l:
126 return map(bin, l.split(sep))
126 return map(bin, l.split(sep))
127 return []
127 return []
128
128
129 def encodelist(l, sep=' '):
129 def encodelist(l, sep=' '):
130 try:
130 try:
131 return sep.join(map(hex, l))
131 return sep.join(map(hex, l))
132 except TypeError:
132 except TypeError:
133 raise
133 raise
134
134
135 # batched call argument encoding
135 # batched call argument encoding
136
136
137 def escapearg(plain):
137 def escapearg(plain):
138 return (plain
138 return (plain
139 .replace(':', ':c')
139 .replace(':', ':c')
140 .replace(',', ':o')
140 .replace(',', ':o')
141 .replace(';', ':s')
141 .replace(';', ':s')
142 .replace('=', ':e'))
142 .replace('=', ':e'))
143
143
144 def unescapearg(escaped):
144 def unescapearg(escaped):
145 return (escaped
145 return (escaped
146 .replace(':e', '=')
146 .replace(':e', '=')
147 .replace(':s', ';')
147 .replace(':s', ';')
148 .replace(':o', ',')
148 .replace(':o', ',')
149 .replace(':c', ':'))
149 .replace(':c', ':'))
150
150
151 # mapping of options accepted by getbundle and their types
151 # mapping of options accepted by getbundle and their types
152 #
152 #
153 # Meant to be extended by extensions. It is extensions responsibility to ensure
153 # Meant to be extended by extensions. It is extensions responsibility to ensure
154 # such options are properly processed in exchange.getbundle.
154 # such options are properly processed in exchange.getbundle.
155 #
155 #
156 # supported types are:
156 # supported types are:
157 #
157 #
158 # :nodes: list of binary nodes
158 # :nodes: list of binary nodes
159 # :csv: list of comma-separated values
159 # :csv: list of comma-separated values
160 # :scsv: list of comma-separated values return as set
160 # :scsv: list of comma-separated values return as set
161 # :plain: string with no transformation needed.
161 # :plain: string with no transformation needed.
162 gboptsmap = {'heads': 'nodes',
162 gboptsmap = {'heads': 'nodes',
163 'common': 'nodes',
163 'common': 'nodes',
164 'obsmarkers': 'boolean',
164 'obsmarkers': 'boolean',
165 'bundlecaps': 'scsv',
165 'bundlecaps': 'scsv',
166 'listkeys': 'csv',
166 'listkeys': 'csv',
167 'cg': 'boolean',
167 'cg': 'boolean',
168 'cbattempted': 'boolean'}
168 'cbattempted': 'boolean'}
169
169
170 # client side
170 # client side
171
171
172 class wirepeer(peer.peerrepository):
172 class wirepeer(peer.peerrepository):
173 """Client-side interface for communicating with a peer repository.
173 """Client-side interface for communicating with a peer repository.
174
174
175 Methods commonly call wire protocol commands of the same name.
175 Methods commonly call wire protocol commands of the same name.
176
176
177 See also httppeer.py and sshpeer.py for protocol-specific
177 See also httppeer.py and sshpeer.py for protocol-specific
178 implementations of this interface.
178 implementations of this interface.
179 """
179 """
180 def batch(self):
180 def batch(self):
181 if self.capable('batch'):
181 if self.capable('batch'):
182 return remotebatch(self)
182 return remotebatch(self)
183 else:
183 else:
184 return peer.localbatch(self)
184 return peer.localbatch(self)
185 def _submitbatch(self, req):
185 def _submitbatch(self, req):
186 cmds = []
186 cmds = []
187 for op, argsdict in req:
187 for op, argsdict in req:
188 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
188 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
189 for k, v in argsdict.iteritems())
189 for k, v in argsdict.iteritems())
190 cmds.append('%s %s' % (op, args))
190 cmds.append('%s %s' % (op, args))
191 rsp = self._call("batch", cmds=';'.join(cmds))
191 rsp = self._call("batch", cmds=';'.join(cmds))
192 return [unescapearg(r) for r in rsp.split(';')]
192 return [unescapearg(r) for r in rsp.split(';')]
193 def _submitone(self, op, args):
193 def _submitone(self, op, args):
194 return self._call(op, **args)
194 return self._call(op, **args)
195
195
196 @batchable
196 @batchable
197 def lookup(self, key):
197 def lookup(self, key):
198 self.requirecap('lookup', _('look up remote revision'))
198 self.requirecap('lookup', _('look up remote revision'))
199 f = future()
199 f = future()
200 yield {'key': encoding.fromlocal(key)}, f
200 yield {'key': encoding.fromlocal(key)}, f
201 d = f.value
201 d = f.value
202 success, data = d[:-1].split(" ", 1)
202 success, data = d[:-1].split(" ", 1)
203 if int(success):
203 if int(success):
204 yield bin(data)
204 yield bin(data)
205 self._abort(error.RepoError(data))
205 self._abort(error.RepoError(data))
206
206
207 @batchable
207 @batchable
208 def heads(self):
208 def heads(self):
209 f = future()
209 f = future()
210 yield {}, f
210 yield {}, f
211 d = f.value
211 d = f.value
212 try:
212 try:
213 yield decodelist(d[:-1])
213 yield decodelist(d[:-1])
214 except ValueError:
214 except ValueError:
215 self._abort(error.ResponseError(_("unexpected response:"), d))
215 self._abort(error.ResponseError(_("unexpected response:"), d))
216
216
217 @batchable
217 @batchable
218 def known(self, nodes):
218 def known(self, nodes):
219 f = future()
219 f = future()
220 yield {'nodes': encodelist(nodes)}, f
220 yield {'nodes': encodelist(nodes)}, f
221 d = f.value
221 d = f.value
222 try:
222 try:
223 yield [bool(int(b)) for b in d]
223 yield [bool(int(b)) for b in d]
224 except ValueError:
224 except ValueError:
225 self._abort(error.ResponseError(_("unexpected response:"), d))
225 self._abort(error.ResponseError(_("unexpected response:"), d))
226
226
227 @batchable
227 @batchable
228 def branchmap(self):
228 def branchmap(self):
229 f = future()
229 f = future()
230 yield {}, f
230 yield {}, f
231 d = f.value
231 d = f.value
232 try:
232 try:
233 branchmap = {}
233 branchmap = {}
234 for branchpart in d.splitlines():
234 for branchpart in d.splitlines():
235 branchname, branchheads = branchpart.split(' ', 1)
235 branchname, branchheads = branchpart.split(' ', 1)
236 branchname = encoding.tolocal(urllib.unquote(branchname))
236 branchname = encoding.tolocal(urllib.unquote(branchname))
237 branchheads = decodelist(branchheads)
237 branchheads = decodelist(branchheads)
238 branchmap[branchname] = branchheads
238 branchmap[branchname] = branchheads
239 yield branchmap
239 yield branchmap
240 except TypeError:
240 except TypeError:
241 self._abort(error.ResponseError(_("unexpected response:"), d))
241 self._abort(error.ResponseError(_("unexpected response:"), d))
242
242
243 def branches(self, nodes):
243 def branches(self, nodes):
244 n = encodelist(nodes)
244 n = encodelist(nodes)
245 d = self._call("branches", nodes=n)
245 d = self._call("branches", nodes=n)
246 try:
246 try:
247 br = [tuple(decodelist(b)) for b in d.splitlines()]
247 br = [tuple(decodelist(b)) for b in d.splitlines()]
248 return br
248 return br
249 except ValueError:
249 except ValueError:
250 self._abort(error.ResponseError(_("unexpected response:"), d))
250 self._abort(error.ResponseError(_("unexpected response:"), d))
251
251
252 def between(self, pairs):
252 def between(self, pairs):
253 batch = 8 # avoid giant requests
253 batch = 8 # avoid giant requests
254 r = []
254 r = []
255 for i in xrange(0, len(pairs), batch):
255 for i in xrange(0, len(pairs), batch):
256 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
256 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
257 d = self._call("between", pairs=n)
257 d = self._call("between", pairs=n)
258 try:
258 try:
259 r.extend(l and decodelist(l) or [] for l in d.splitlines())
259 r.extend(l and decodelist(l) or [] for l in d.splitlines())
260 except ValueError:
260 except ValueError:
261 self._abort(error.ResponseError(_("unexpected response:"), d))
261 self._abort(error.ResponseError(_("unexpected response:"), d))
262 return r
262 return r
263
263
264 @batchable
264 @batchable
265 def pushkey(self, namespace, key, old, new):
265 def pushkey(self, namespace, key, old, new):
266 if not self.capable('pushkey'):
266 if not self.capable('pushkey'):
267 yield False, None
267 yield False, None
268 f = future()
268 f = future()
269 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
269 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
270 yield {'namespace': encoding.fromlocal(namespace),
270 yield {'namespace': encoding.fromlocal(namespace),
271 'key': encoding.fromlocal(key),
271 'key': encoding.fromlocal(key),
272 'old': encoding.fromlocal(old),
272 'old': encoding.fromlocal(old),
273 'new': encoding.fromlocal(new)}, f
273 'new': encoding.fromlocal(new)}, f
274 d = f.value
274 d = f.value
275 d, output = d.split('\n', 1)
275 d, output = d.split('\n', 1)
276 try:
276 try:
277 d = bool(int(d))
277 d = bool(int(d))
278 except ValueError:
278 except ValueError:
279 raise error.ResponseError(
279 raise error.ResponseError(
280 _('push failed (unexpected response):'), d)
280 _('push failed (unexpected response):'), d)
281 for l in output.splitlines(True):
281 for l in output.splitlines(True):
282 self.ui.status(_('remote: '), l)
282 self.ui.status(_('remote: '), l)
283 yield d
283 yield d
284
284
285 @batchable
285 @batchable
286 def listkeys(self, namespace):
286 def listkeys(self, namespace):
287 if not self.capable('pushkey'):
287 if not self.capable('pushkey'):
288 yield {}, None
288 yield {}, None
289 f = future()
289 f = future()
290 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
290 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
291 yield {'namespace': encoding.fromlocal(namespace)}, f
291 yield {'namespace': encoding.fromlocal(namespace)}, f
292 d = f.value
292 d = f.value
293 self.ui.debug('received listkey for "%s": %i bytes\n'
293 self.ui.debug('received listkey for "%s": %i bytes\n'
294 % (namespace, len(d)))
294 % (namespace, len(d)))
295 yield pushkeymod.decodekeys(d)
295 yield pushkeymod.decodekeys(d)
296
296
297 def stream_out(self):
297 def stream_out(self):
298 return self._callstream('stream_out')
298 return self._callstream('stream_out')
299
299
300 def changegroup(self, nodes, kind):
300 def changegroup(self, nodes, kind):
301 n = encodelist(nodes)
301 n = encodelist(nodes)
302 f = self._callcompressable("changegroup", roots=n)
302 f = self._callcompressable("changegroup", roots=n)
303 return changegroupmod.cg1unpacker(f, 'UN')
303 return changegroupmod.cg1unpacker(f, 'UN')
304
304
305 def changegroupsubset(self, bases, heads, kind):
305 def changegroupsubset(self, bases, heads, kind):
306 self.requirecap('changegroupsubset', _('look up remote changes'))
306 self.requirecap('changegroupsubset', _('look up remote changes'))
307 bases = encodelist(bases)
307 bases = encodelist(bases)
308 heads = encodelist(heads)
308 heads = encodelist(heads)
309 f = self._callcompressable("changegroupsubset",
309 f = self._callcompressable("changegroupsubset",
310 bases=bases, heads=heads)
310 bases=bases, heads=heads)
311 return changegroupmod.cg1unpacker(f, 'UN')
311 return changegroupmod.cg1unpacker(f, 'UN')
312
312
313 def getbundle(self, source, **kwargs):
313 def getbundle(self, source, **kwargs):
314 self.requirecap('getbundle', _('look up remote changes'))
314 self.requirecap('getbundle', _('look up remote changes'))
315 opts = {}
315 opts = {}
316 bundlecaps = kwargs.get('bundlecaps')
316 bundlecaps = kwargs.get('bundlecaps')
317 if bundlecaps is not None:
317 if bundlecaps is not None:
318 kwargs['bundlecaps'] = sorted(bundlecaps)
318 kwargs['bundlecaps'] = sorted(bundlecaps)
319 else:
319 else:
320 bundlecaps = () # kwargs could have it to None
320 bundlecaps = () # kwargs could have it to None
321 for key, value in kwargs.iteritems():
321 for key, value in kwargs.iteritems():
322 if value is None:
322 if value is None:
323 continue
323 continue
324 keytype = gboptsmap.get(key)
324 keytype = gboptsmap.get(key)
325 if keytype is None:
325 if keytype is None:
326 assert False, 'unexpected'
326 assert False, 'unexpected'
327 elif keytype == 'nodes':
327 elif keytype == 'nodes':
328 value = encodelist(value)
328 value = encodelist(value)
329 elif keytype in ('csv', 'scsv'):
329 elif keytype in ('csv', 'scsv'):
330 value = ','.join(value)
330 value = ','.join(value)
331 elif keytype == 'boolean':
331 elif keytype == 'boolean':
332 value = '%i' % bool(value)
332 value = '%i' % bool(value)
333 elif keytype != 'plain':
333 elif keytype != 'plain':
334 raise KeyError('unknown getbundle option type %s'
334 raise KeyError('unknown getbundle option type %s'
335 % keytype)
335 % keytype)
336 opts[key] = value
336 opts[key] = value
337 f = self._callcompressable("getbundle", **opts)
337 f = self._callcompressable("getbundle", **opts)
338 if any((cap.startswith('HG2') for cap in bundlecaps)):
338 if any((cap.startswith('HG2') for cap in bundlecaps)):
339 return bundle2.getunbundler(self.ui, f)
339 return bundle2.getunbundler(self.ui, f)
340 else:
340 else:
341 return changegroupmod.cg1unpacker(f, 'UN')
341 return changegroupmod.cg1unpacker(f, 'UN')
342
342
343 def unbundle(self, cg, heads, source):
343 def unbundle(self, cg, heads, source):
344 '''Send cg (a readable file-like object representing the
344 '''Send cg (a readable file-like object representing the
345 changegroup to push, typically a chunkbuffer object) to the
345 changegroup to push, typically a chunkbuffer object) to the
346 remote server as a bundle.
346 remote server as a bundle.
347
347
348 When pushing a bundle10 stream, return an integer indicating the
348 When pushing a bundle10 stream, return an integer indicating the
349 result of the push (see localrepository.addchangegroup()).
349 result of the push (see localrepository.addchangegroup()).
350
350
351 When pushing a bundle20 stream, return a bundle20 stream.'''
351 When pushing a bundle20 stream, return a bundle20 stream.'''
352
352
353 if heads != ['force'] and self.capable('unbundlehash'):
353 if heads != ['force'] and self.capable('unbundlehash'):
354 heads = encodelist(['hashed',
354 heads = encodelist(['hashed',
355 util.sha1(''.join(sorted(heads))).digest()])
355 util.sha1(''.join(sorted(heads))).digest()])
356 else:
356 else:
357 heads = encodelist(heads)
357 heads = encodelist(heads)
358
358
359 if util.safehasattr(cg, 'deltaheader'):
359 if util.safehasattr(cg, 'deltaheader'):
360 # this a bundle10, do the old style call sequence
360 # this a bundle10, do the old style call sequence
361 ret, output = self._callpush("unbundle", cg, heads=heads)
361 ret, output = self._callpush("unbundle", cg, heads=heads)
362 if ret == "":
362 if ret == "":
363 raise error.ResponseError(
363 raise error.ResponseError(
364 _('push failed:'), output)
364 _('push failed:'), output)
365 try:
365 try:
366 ret = int(ret)
366 ret = int(ret)
367 except ValueError:
367 except ValueError:
368 raise error.ResponseError(
368 raise error.ResponseError(
369 _('push failed (unexpected response):'), ret)
369 _('push failed (unexpected response):'), ret)
370
370
371 for l in output.splitlines(True):
371 for l in output.splitlines(True):
372 self.ui.status(_('remote: '), l)
372 self.ui.status(_('remote: '), l)
373 else:
373 else:
374 # bundle2 push. Send a stream, fetch a stream.
374 # bundle2 push. Send a stream, fetch a stream.
375 stream = self._calltwowaystream('unbundle', cg, heads=heads)
375 stream = self._calltwowaystream('unbundle', cg, heads=heads)
376 ret = bundle2.getunbundler(self.ui, stream)
376 ret = bundle2.getunbundler(self.ui, stream)
377 return ret
377 return ret
378
378
379 def debugwireargs(self, one, two, three=None, four=None, five=None):
379 def debugwireargs(self, one, two, three=None, four=None, five=None):
380 # don't pass optional arguments left at their default value
380 # don't pass optional arguments left at their default value
381 opts = {}
381 opts = {}
382 if three is not None:
382 if three is not None:
383 opts['three'] = three
383 opts['three'] = three
384 if four is not None:
384 if four is not None:
385 opts['four'] = four
385 opts['four'] = four
386 return self._call('debugwireargs', one=one, two=two, **opts)
386 return self._call('debugwireargs', one=one, two=two, **opts)
387
387
388 def _call(self, cmd, **args):
388 def _call(self, cmd, **args):
389 """execute <cmd> on the server
389 """execute <cmd> on the server
390
390
391 The command is expected to return a simple string.
391 The command is expected to return a simple string.
392
392
393 returns the server reply as a string."""
393 returns the server reply as a string."""
394 raise NotImplementedError()
394 raise NotImplementedError()
395
395
396 def _callstream(self, cmd, **args):
396 def _callstream(self, cmd, **args):
397 """execute <cmd> on the server
397 """execute <cmd> on the server
398
398
399 The command is expected to return a stream.
399 The command is expected to return a stream. Note that if the
400 command doesn't return a stream, _callstream behaves
401 differently for ssh and http peers.
400
402
401 returns the server reply as a file like object."""
403 returns the server reply as a file like object.
404 """
402 raise NotImplementedError()
405 raise NotImplementedError()
403
406
404 def _callcompressable(self, cmd, **args):
407 def _callcompressable(self, cmd, **args):
405 """execute <cmd> on the server
408 """execute <cmd> on the server
406
409
407 The command is expected to return a stream.
410 The command is expected to return a stream.
408
411
409 The stream may have been compressed in some implementations. This
412 The stream may have been compressed in some implementations. This
410 function takes care of the decompression. This is the only difference
413 function takes care of the decompression. This is the only difference
411 with _callstream.
414 with _callstream.
412
415
413 returns the server reply as a file like object.
416 returns the server reply as a file like object.
414 """
417 """
415 raise NotImplementedError()
418 raise NotImplementedError()
416
419
417 def _callpush(self, cmd, fp, **args):
420 def _callpush(self, cmd, fp, **args):
418 """execute a <cmd> on server
421 """execute a <cmd> on server
419
422
420 The command is expected to be related to a push. Push has a special
423 The command is expected to be related to a push. Push has a special
421 return method.
424 return method.
422
425
423 returns the server reply as a (ret, output) tuple. ret is either
426 returns the server reply as a (ret, output) tuple. ret is either
424 empty (error) or a stringified int.
427 empty (error) or a stringified int.
425 """
428 """
426 raise NotImplementedError()
429 raise NotImplementedError()
427
430
428 def _calltwowaystream(self, cmd, fp, **args):
431 def _calltwowaystream(self, cmd, fp, **args):
429 """execute <cmd> on server
432 """execute <cmd> on server
430
433
431 The command will send a stream to the server and get a stream in reply.
434 The command will send a stream to the server and get a stream in reply.
432 """
435 """
433 raise NotImplementedError()
436 raise NotImplementedError()
434
437
435 def _abort(self, exception):
438 def _abort(self, exception):
436 """clearly abort the wire protocol connection and raise the exception
439 """clearly abort the wire protocol connection and raise the exception
437 """
440 """
438 raise NotImplementedError()
441 raise NotImplementedError()
439
442
440 # server side
443 # server side
441
444
442 # wire protocol command can either return a string or one of these classes.
445 # wire protocol command can either return a string or one of these classes.
443 class streamres(object):
446 class streamres(object):
444 """wireproto reply: binary stream
447 """wireproto reply: binary stream
445
448
446 The call was successful and the result is a stream.
449 The call was successful and the result is a stream.
447 Iterate on the `self.gen` attribute to retrieve chunks.
450 Iterate on the `self.gen` attribute to retrieve chunks.
448 """
451 """
449 def __init__(self, gen):
452 def __init__(self, gen):
450 self.gen = gen
453 self.gen = gen
451
454
452 class pushres(object):
455 class pushres(object):
453 """wireproto reply: success with simple integer return
456 """wireproto reply: success with simple integer return
454
457
455 The call was successful and returned an integer contained in `self.res`.
458 The call was successful and returned an integer contained in `self.res`.
456 """
459 """
457 def __init__(self, res):
460 def __init__(self, res):
458 self.res = res
461 self.res = res
459
462
460 class pusherr(object):
463 class pusherr(object):
461 """wireproto reply: failure
464 """wireproto reply: failure
462
465
463 The call failed. The `self.res` attribute contains the error message.
466 The call failed. The `self.res` attribute contains the error message.
464 """
467 """
465 def __init__(self, res):
468 def __init__(self, res):
466 self.res = res
469 self.res = res
467
470
468 class ooberror(object):
471 class ooberror(object):
469 """wireproto reply: failure of a batch of operation
472 """wireproto reply: failure of a batch of operation
470
473
471 Something failed during a batch call. The error message is stored in
474 Something failed during a batch call. The error message is stored in
472 `self.message`.
475 `self.message`.
473 """
476 """
474 def __init__(self, message):
477 def __init__(self, message):
475 self.message = message
478 self.message = message
476
479
477 def dispatch(repo, proto, command):
480 def dispatch(repo, proto, command):
478 repo = repo.filtered("served")
481 repo = repo.filtered("served")
479 func, spec = commands[command]
482 func, spec = commands[command]
480 args = proto.getargs(spec)
483 args = proto.getargs(spec)
481 return func(repo, proto, *args)
484 return func(repo, proto, *args)
482
485
483 def options(cmd, keys, others):
486 def options(cmd, keys, others):
484 opts = {}
487 opts = {}
485 for k in keys:
488 for k in keys:
486 if k in others:
489 if k in others:
487 opts[k] = others[k]
490 opts[k] = others[k]
488 del others[k]
491 del others[k]
489 if others:
492 if others:
490 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
493 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
491 % (cmd, ",".join(others)))
494 % (cmd, ",".join(others)))
492 return opts
495 return opts
493
496
494 def bundle1allowed(repo, action):
497 def bundle1allowed(repo, action):
495 """Whether a bundle1 operation is allowed from the server.
498 """Whether a bundle1 operation is allowed from the server.
496
499
497 Priority is:
500 Priority is:
498
501
499 1. server.bundle1gd.<action> (if generaldelta active)
502 1. server.bundle1gd.<action> (if generaldelta active)
500 2. server.bundle1.<action>
503 2. server.bundle1.<action>
501 3. server.bundle1gd (if generaldelta active)
504 3. server.bundle1gd (if generaldelta active)
502 4. server.bundle1
505 4. server.bundle1
503 """
506 """
504 ui = repo.ui
507 ui = repo.ui
505 gd = 'generaldelta' in repo.requirements
508 gd = 'generaldelta' in repo.requirements
506
509
507 if gd:
510 if gd:
508 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
511 v = ui.configbool('server', 'bundle1gd.%s' % action, None)
509 if v is not None:
512 if v is not None:
510 return v
513 return v
511
514
512 v = ui.configbool('server', 'bundle1.%s' % action, None)
515 v = ui.configbool('server', 'bundle1.%s' % action, None)
513 if v is not None:
516 if v is not None:
514 return v
517 return v
515
518
516 if gd:
519 if gd:
517 v = ui.configbool('server', 'bundle1gd', None)
520 v = ui.configbool('server', 'bundle1gd', None)
518 if v is not None:
521 if v is not None:
519 return v
522 return v
520
523
521 return ui.configbool('server', 'bundle1', True)
524 return ui.configbool('server', 'bundle1', True)
522
525
523 # list of commands
526 # list of commands
524 commands = {}
527 commands = {}
525
528
526 def wireprotocommand(name, args=''):
529 def wireprotocommand(name, args=''):
527 """decorator for wire protocol command"""
530 """decorator for wire protocol command"""
528 def register(func):
531 def register(func):
529 commands[name] = (func, args)
532 commands[name] = (func, args)
530 return func
533 return func
531 return register
534 return register
532
535
533 @wireprotocommand('batch', 'cmds *')
536 @wireprotocommand('batch', 'cmds *')
534 def batch(repo, proto, cmds, others):
537 def batch(repo, proto, cmds, others):
535 repo = repo.filtered("served")
538 repo = repo.filtered("served")
536 res = []
539 res = []
537 for pair in cmds.split(';'):
540 for pair in cmds.split(';'):
538 op, args = pair.split(' ', 1)
541 op, args = pair.split(' ', 1)
539 vals = {}
542 vals = {}
540 for a in args.split(','):
543 for a in args.split(','):
541 if a:
544 if a:
542 n, v = a.split('=')
545 n, v = a.split('=')
543 vals[n] = unescapearg(v)
546 vals[n] = unescapearg(v)
544 func, spec = commands[op]
547 func, spec = commands[op]
545 if spec:
548 if spec:
546 keys = spec.split()
549 keys = spec.split()
547 data = {}
550 data = {}
548 for k in keys:
551 for k in keys:
549 if k == '*':
552 if k == '*':
550 star = {}
553 star = {}
551 for key in vals.keys():
554 for key in vals.keys():
552 if key not in keys:
555 if key not in keys:
553 star[key] = vals[key]
556 star[key] = vals[key]
554 data['*'] = star
557 data['*'] = star
555 else:
558 else:
556 data[k] = vals[k]
559 data[k] = vals[k]
557 result = func(repo, proto, *[data[k] for k in keys])
560 result = func(repo, proto, *[data[k] for k in keys])
558 else:
561 else:
559 result = func(repo, proto)
562 result = func(repo, proto)
560 if isinstance(result, ooberror):
563 if isinstance(result, ooberror):
561 return result
564 return result
562 res.append(escapearg(result))
565 res.append(escapearg(result))
563 return ';'.join(res)
566 return ';'.join(res)
564
567
565 @wireprotocommand('between', 'pairs')
568 @wireprotocommand('between', 'pairs')
566 def between(repo, proto, pairs):
569 def between(repo, proto, pairs):
567 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
570 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
568 r = []
571 r = []
569 for b in repo.between(pairs):
572 for b in repo.between(pairs):
570 r.append(encodelist(b) + "\n")
573 r.append(encodelist(b) + "\n")
571 return "".join(r)
574 return "".join(r)
572
575
573 @wireprotocommand('branchmap')
576 @wireprotocommand('branchmap')
574 def branchmap(repo, proto):
577 def branchmap(repo, proto):
575 branchmap = repo.branchmap()
578 branchmap = repo.branchmap()
576 heads = []
579 heads = []
577 for branch, nodes in branchmap.iteritems():
580 for branch, nodes in branchmap.iteritems():
578 branchname = urllib.quote(encoding.fromlocal(branch))
581 branchname = urllib.quote(encoding.fromlocal(branch))
579 branchnodes = encodelist(nodes)
582 branchnodes = encodelist(nodes)
580 heads.append('%s %s' % (branchname, branchnodes))
583 heads.append('%s %s' % (branchname, branchnodes))
581 return '\n'.join(heads)
584 return '\n'.join(heads)
582
585
583 @wireprotocommand('branches', 'nodes')
586 @wireprotocommand('branches', 'nodes')
584 def branches(repo, proto, nodes):
587 def branches(repo, proto, nodes):
585 nodes = decodelist(nodes)
588 nodes = decodelist(nodes)
586 r = []
589 r = []
587 for b in repo.branches(nodes):
590 for b in repo.branches(nodes):
588 r.append(encodelist(b) + "\n")
591 r.append(encodelist(b) + "\n")
589 return "".join(r)
592 return "".join(r)
590
593
591 @wireprotocommand('clonebundles', '')
594 @wireprotocommand('clonebundles', '')
592 def clonebundles(repo, proto):
595 def clonebundles(repo, proto):
593 """Server command for returning info for available bundles to seed clones.
596 """Server command for returning info for available bundles to seed clones.
594
597
595 Clients will parse this response and determine what bundle to fetch.
598 Clients will parse this response and determine what bundle to fetch.
596
599
597 Extensions may wrap this command to filter or dynamically emit data
600 Extensions may wrap this command to filter or dynamically emit data
598 depending on the request. e.g. you could advertise URLs for the closest
601 depending on the request. e.g. you could advertise URLs for the closest
599 data center given the client's IP address.
602 data center given the client's IP address.
600 """
603 """
601 return repo.opener.tryread('clonebundles.manifest')
604 return repo.opener.tryread('clonebundles.manifest')
602
605
603 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
606 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
604 'known', 'getbundle', 'unbundlehash', 'batch']
607 'known', 'getbundle', 'unbundlehash', 'batch']
605
608
606 def _capabilities(repo, proto):
609 def _capabilities(repo, proto):
607 """return a list of capabilities for a repo
610 """return a list of capabilities for a repo
608
611
609 This function exists to allow extensions to easily wrap capabilities
612 This function exists to allow extensions to easily wrap capabilities
610 computation
613 computation
611
614
612 - returns a lists: easy to alter
615 - returns a lists: easy to alter
613 - change done here will be propagated to both `capabilities` and `hello`
616 - change done here will be propagated to both `capabilities` and `hello`
614 command without any other action needed.
617 command without any other action needed.
615 """
618 """
616 # copy to prevent modification of the global list
619 # copy to prevent modification of the global list
617 caps = list(wireprotocaps)
620 caps = list(wireprotocaps)
618 if streamclone.allowservergeneration(repo.ui):
621 if streamclone.allowservergeneration(repo.ui):
619 if repo.ui.configbool('server', 'preferuncompressed', False):
622 if repo.ui.configbool('server', 'preferuncompressed', False):
620 caps.append('stream-preferred')
623 caps.append('stream-preferred')
621 requiredformats = repo.requirements & repo.supportedformats
624 requiredformats = repo.requirements & repo.supportedformats
622 # if our local revlogs are just revlogv1, add 'stream' cap
625 # if our local revlogs are just revlogv1, add 'stream' cap
623 if not requiredformats - set(('revlogv1',)):
626 if not requiredformats - set(('revlogv1',)):
624 caps.append('stream')
627 caps.append('stream')
625 # otherwise, add 'streamreqs' detailing our local revlog format
628 # otherwise, add 'streamreqs' detailing our local revlog format
626 else:
629 else:
627 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
630 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
628 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
631 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
629 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
632 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
630 caps.append('bundle2=' + urllib.quote(capsblob))
633 caps.append('bundle2=' + urllib.quote(capsblob))
631 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
634 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
632 caps.append(
635 caps.append(
633 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
636 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
634 return caps
637 return caps
635
638
636 # If you are writing an extension and consider wrapping this function. Wrap
639 # If you are writing an extension and consider wrapping this function. Wrap
637 # `_capabilities` instead.
640 # `_capabilities` instead.
638 @wireprotocommand('capabilities')
641 @wireprotocommand('capabilities')
639 def capabilities(repo, proto):
642 def capabilities(repo, proto):
640 return ' '.join(_capabilities(repo, proto))
643 return ' '.join(_capabilities(repo, proto))
641
644
642 @wireprotocommand('changegroup', 'roots')
645 @wireprotocommand('changegroup', 'roots')
643 def changegroup(repo, proto, roots):
646 def changegroup(repo, proto, roots):
644 nodes = decodelist(roots)
647 nodes = decodelist(roots)
645 cg = changegroupmod.changegroup(repo, nodes, 'serve')
648 cg = changegroupmod.changegroup(repo, nodes, 'serve')
646 return streamres(proto.groupchunks(cg))
649 return streamres(proto.groupchunks(cg))
647
650
648 @wireprotocommand('changegroupsubset', 'bases heads')
651 @wireprotocommand('changegroupsubset', 'bases heads')
649 def changegroupsubset(repo, proto, bases, heads):
652 def changegroupsubset(repo, proto, bases, heads):
650 bases = decodelist(bases)
653 bases = decodelist(bases)
651 heads = decodelist(heads)
654 heads = decodelist(heads)
652 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
655 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
653 return streamres(proto.groupchunks(cg))
656 return streamres(proto.groupchunks(cg))
654
657
655 @wireprotocommand('debugwireargs', 'one two *')
658 @wireprotocommand('debugwireargs', 'one two *')
656 def debugwireargs(repo, proto, one, two, others):
659 def debugwireargs(repo, proto, one, two, others):
657 # only accept optional args from the known set
660 # only accept optional args from the known set
658 opts = options('debugwireargs', ['three', 'four'], others)
661 opts = options('debugwireargs', ['three', 'four'], others)
659 return repo.debugwireargs(one, two, **opts)
662 return repo.debugwireargs(one, two, **opts)
660
663
661 # List of options accepted by getbundle.
664 # List of options accepted by getbundle.
662 #
665 #
663 # Meant to be extended by extensions. It is the extension's responsibility to
666 # Meant to be extended by extensions. It is the extension's responsibility to
664 # ensure such options are properly processed in exchange.getbundle.
667 # ensure such options are properly processed in exchange.getbundle.
665 gboptslist = ['heads', 'common', 'bundlecaps']
668 gboptslist = ['heads', 'common', 'bundlecaps']
666
669
667 @wireprotocommand('getbundle', '*')
670 @wireprotocommand('getbundle', '*')
668 def getbundle(repo, proto, others):
671 def getbundle(repo, proto, others):
669 opts = options('getbundle', gboptsmap.keys(), others)
672 opts = options('getbundle', gboptsmap.keys(), others)
670 for k, v in opts.iteritems():
673 for k, v in opts.iteritems():
671 keytype = gboptsmap[k]
674 keytype = gboptsmap[k]
672 if keytype == 'nodes':
675 if keytype == 'nodes':
673 opts[k] = decodelist(v)
676 opts[k] = decodelist(v)
674 elif keytype == 'csv':
677 elif keytype == 'csv':
675 opts[k] = list(v.split(','))
678 opts[k] = list(v.split(','))
676 elif keytype == 'scsv':
679 elif keytype == 'scsv':
677 opts[k] = set(v.split(','))
680 opts[k] = set(v.split(','))
678 elif keytype == 'boolean':
681 elif keytype == 'boolean':
679 # Client should serialize False as '0', which is a non-empty string
682 # Client should serialize False as '0', which is a non-empty string
680 # so it evaluates as a True bool.
683 # so it evaluates as a True bool.
681 if v == '0':
684 if v == '0':
682 opts[k] = False
685 opts[k] = False
683 else:
686 else:
684 opts[k] = bool(v)
687 opts[k] = bool(v)
685 elif keytype != 'plain':
688 elif keytype != 'plain':
686 raise KeyError('unknown getbundle option type %s'
689 raise KeyError('unknown getbundle option type %s'
687 % keytype)
690 % keytype)
688
691
689 if not bundle1allowed(repo, 'pull'):
692 if not bundle1allowed(repo, 'pull'):
690 if not exchange.bundle2requested(opts.get('bundlecaps')):
693 if not exchange.bundle2requested(opts.get('bundlecaps')):
691 return ooberror(bundle2required)
694 return ooberror(bundle2required)
692
695
693 cg = exchange.getbundle(repo, 'serve', **opts)
696 cg = exchange.getbundle(repo, 'serve', **opts)
694 return streamres(proto.groupchunks(cg))
697 return streamres(proto.groupchunks(cg))
695
698
696 @wireprotocommand('heads')
699 @wireprotocommand('heads')
697 def heads(repo, proto):
700 def heads(repo, proto):
698 h = repo.heads()
701 h = repo.heads()
699 return encodelist(h) + "\n"
702 return encodelist(h) + "\n"
700
703
701 @wireprotocommand('hello')
704 @wireprotocommand('hello')
702 def hello(repo, proto):
705 def hello(repo, proto):
703 '''the hello command returns a set of lines describing various
706 '''the hello command returns a set of lines describing various
704 interesting things about the server, in an RFC822-like format.
707 interesting things about the server, in an RFC822-like format.
705 Currently the only one defined is "capabilities", which
708 Currently the only one defined is "capabilities", which
706 consists of a line in the form:
709 consists of a line in the form:
707
710
708 capabilities: space separated list of tokens
711 capabilities: space separated list of tokens
709 '''
712 '''
710 return "capabilities: %s\n" % (capabilities(repo, proto))
713 return "capabilities: %s\n" % (capabilities(repo, proto))
711
714
712 @wireprotocommand('listkeys', 'namespace')
715 @wireprotocommand('listkeys', 'namespace')
713 def listkeys(repo, proto, namespace):
716 def listkeys(repo, proto, namespace):
714 d = repo.listkeys(encoding.tolocal(namespace)).items()
717 d = repo.listkeys(encoding.tolocal(namespace)).items()
715 return pushkeymod.encodekeys(d)
718 return pushkeymod.encodekeys(d)
716
719
717 @wireprotocommand('lookup', 'key')
720 @wireprotocommand('lookup', 'key')
718 def lookup(repo, proto, key):
721 def lookup(repo, proto, key):
719 try:
722 try:
720 k = encoding.tolocal(key)
723 k = encoding.tolocal(key)
721 c = repo[k]
724 c = repo[k]
722 r = c.hex()
725 r = c.hex()
723 success = 1
726 success = 1
724 except Exception as inst:
727 except Exception as inst:
725 r = str(inst)
728 r = str(inst)
726 success = 0
729 success = 0
727 return "%s %s\n" % (success, r)
730 return "%s %s\n" % (success, r)
728
731
729 @wireprotocommand('known', 'nodes *')
732 @wireprotocommand('known', 'nodes *')
730 def known(repo, proto, nodes, others):
733 def known(repo, proto, nodes, others):
731 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
734 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
732
735
733 @wireprotocommand('pushkey', 'namespace key old new')
736 @wireprotocommand('pushkey', 'namespace key old new')
734 def pushkey(repo, proto, namespace, key, old, new):
737 def pushkey(repo, proto, namespace, key, old, new):
735 # compatibility with pre-1.8 clients which were accidentally
738 # compatibility with pre-1.8 clients which were accidentally
736 # sending raw binary nodes rather than utf-8-encoded hex
739 # sending raw binary nodes rather than utf-8-encoded hex
737 if len(new) == 20 and new.encode('string-escape') != new:
740 if len(new) == 20 and new.encode('string-escape') != new:
738 # looks like it could be a binary node
741 # looks like it could be a binary node
739 try:
742 try:
740 new.decode('utf-8')
743 new.decode('utf-8')
741 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
744 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
742 except UnicodeDecodeError:
745 except UnicodeDecodeError:
743 pass # binary, leave unmodified
746 pass # binary, leave unmodified
744 else:
747 else:
745 new = encoding.tolocal(new) # normal path
748 new = encoding.tolocal(new) # normal path
746
749
747 if util.safehasattr(proto, 'restore'):
750 if util.safehasattr(proto, 'restore'):
748
751
749 proto.redirect()
752 proto.redirect()
750
753
751 try:
754 try:
752 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
755 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
753 encoding.tolocal(old), new) or False
756 encoding.tolocal(old), new) or False
754 except error.Abort:
757 except error.Abort:
755 r = False
758 r = False
756
759
757 output = proto.restore()
760 output = proto.restore()
758
761
759 return '%s\n%s' % (int(r), output)
762 return '%s\n%s' % (int(r), output)
760
763
761 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
764 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
762 encoding.tolocal(old), new)
765 encoding.tolocal(old), new)
763 return '%s\n' % int(r)
766 return '%s\n' % int(r)
764
767
765 @wireprotocommand('stream_out')
768 @wireprotocommand('stream_out')
766 def stream(repo, proto):
769 def stream(repo, proto):
767 '''If the server supports streaming clone, it advertises the "stream"
770 '''If the server supports streaming clone, it advertises the "stream"
768 capability with a value representing the version and flags of the repo
771 capability with a value representing the version and flags of the repo
769 it is serving. Client checks to see if it understands the format.
772 it is serving. Client checks to see if it understands the format.
770 '''
773 '''
771 if not streamclone.allowservergeneration(repo.ui):
774 if not streamclone.allowservergeneration(repo.ui):
772 return '1\n'
775 return '1\n'
773
776
774 def getstream(it):
777 def getstream(it):
775 yield '0\n'
778 yield '0\n'
776 for chunk in it:
779 for chunk in it:
777 yield chunk
780 yield chunk
778
781
779 try:
782 try:
780 # LockError may be raised before the first result is yielded. Don't
783 # LockError may be raised before the first result is yielded. Don't
781 # emit output until we're sure we got the lock successfully.
784 # emit output until we're sure we got the lock successfully.
782 it = streamclone.generatev1wireproto(repo)
785 it = streamclone.generatev1wireproto(repo)
783 return streamres(getstream(it))
786 return streamres(getstream(it))
784 except error.LockError:
787 except error.LockError:
785 return '2\n'
788 return '2\n'
786
789
787 @wireprotocommand('unbundle', 'heads')
790 @wireprotocommand('unbundle', 'heads')
788 def unbundle(repo, proto, heads):
791 def unbundle(repo, proto, heads):
789 their_heads = decodelist(heads)
792 their_heads = decodelist(heads)
790
793
791 try:
794 try:
792 proto.redirect()
795 proto.redirect()
793
796
794 exchange.check_heads(repo, their_heads, 'preparing changes')
797 exchange.check_heads(repo, their_heads, 'preparing changes')
795
798
796 # write bundle data to temporary file because it can be big
799 # write bundle data to temporary file because it can be big
797 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
800 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
798 fp = os.fdopen(fd, 'wb+')
801 fp = os.fdopen(fd, 'wb+')
799 r = 0
802 r = 0
800 try:
803 try:
801 proto.getfile(fp)
804 proto.getfile(fp)
802 fp.seek(0)
805 fp.seek(0)
803 gen = exchange.readbundle(repo.ui, fp, None)
806 gen = exchange.readbundle(repo.ui, fp, None)
804 if (isinstance(gen, changegroupmod.cg1unpacker)
807 if (isinstance(gen, changegroupmod.cg1unpacker)
805 and not bundle1allowed(repo, 'push')):
808 and not bundle1allowed(repo, 'push')):
806 return ooberror(bundle2required)
809 return ooberror(bundle2required)
807
810
808 r = exchange.unbundle(repo, gen, their_heads, 'serve',
811 r = exchange.unbundle(repo, gen, their_heads, 'serve',
809 proto._client())
812 proto._client())
810 if util.safehasattr(r, 'addpart'):
813 if util.safehasattr(r, 'addpart'):
811 # The return looks streamable, we are in the bundle2 case and
814 # The return looks streamable, we are in the bundle2 case and
812 # should return a stream.
815 # should return a stream.
813 return streamres(r.getchunks())
816 return streamres(r.getchunks())
814 return pushres(r)
817 return pushres(r)
815
818
816 finally:
819 finally:
817 fp.close()
820 fp.close()
818 os.unlink(tempname)
821 os.unlink(tempname)
819
822
820 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
823 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
821 # handle non-bundle2 case first
824 # handle non-bundle2 case first
822 if not getattr(exc, 'duringunbundle2', False):
825 if not getattr(exc, 'duringunbundle2', False):
823 try:
826 try:
824 raise
827 raise
825 except error.Abort:
828 except error.Abort:
826 # The old code we moved used sys.stderr directly.
829 # The old code we moved used sys.stderr directly.
827 # We did not change it to minimise code change.
830 # We did not change it to minimise code change.
828 # This need to be moved to something proper.
831 # This need to be moved to something proper.
829 # Feel free to do it.
832 # Feel free to do it.
830 sys.stderr.write("abort: %s\n" % exc)
833 sys.stderr.write("abort: %s\n" % exc)
831 return pushres(0)
834 return pushres(0)
832 except error.PushRaced:
835 except error.PushRaced:
833 return pusherr(str(exc))
836 return pusherr(str(exc))
834
837
835 bundler = bundle2.bundle20(repo.ui)
838 bundler = bundle2.bundle20(repo.ui)
836 for out in getattr(exc, '_bundle2salvagedoutput', ()):
839 for out in getattr(exc, '_bundle2salvagedoutput', ()):
837 bundler.addpart(out)
840 bundler.addpart(out)
838 try:
841 try:
839 try:
842 try:
840 raise
843 raise
841 except error.PushkeyFailed as exc:
844 except error.PushkeyFailed as exc:
842 # check client caps
845 # check client caps
843 remotecaps = getattr(exc, '_replycaps', None)
846 remotecaps = getattr(exc, '_replycaps', None)
844 if (remotecaps is not None
847 if (remotecaps is not None
845 and 'pushkey' not in remotecaps.get('error', ())):
848 and 'pushkey' not in remotecaps.get('error', ())):
846 # no support remote side, fallback to Abort handler.
849 # no support remote side, fallback to Abort handler.
847 raise
850 raise
848 part = bundler.newpart('error:pushkey')
851 part = bundler.newpart('error:pushkey')
849 part.addparam('in-reply-to', exc.partid)
852 part.addparam('in-reply-to', exc.partid)
850 if exc.namespace is not None:
853 if exc.namespace is not None:
851 part.addparam('namespace', exc.namespace, mandatory=False)
854 part.addparam('namespace', exc.namespace, mandatory=False)
852 if exc.key is not None:
855 if exc.key is not None:
853 part.addparam('key', exc.key, mandatory=False)
856 part.addparam('key', exc.key, mandatory=False)
854 if exc.new is not None:
857 if exc.new is not None:
855 part.addparam('new', exc.new, mandatory=False)
858 part.addparam('new', exc.new, mandatory=False)
856 if exc.old is not None:
859 if exc.old is not None:
857 part.addparam('old', exc.old, mandatory=False)
860 part.addparam('old', exc.old, mandatory=False)
858 if exc.ret is not None:
861 if exc.ret is not None:
859 part.addparam('ret', exc.ret, mandatory=False)
862 part.addparam('ret', exc.ret, mandatory=False)
860 except error.BundleValueError as exc:
863 except error.BundleValueError as exc:
861 errpart = bundler.newpart('error:unsupportedcontent')
864 errpart = bundler.newpart('error:unsupportedcontent')
862 if exc.parttype is not None:
865 if exc.parttype is not None:
863 errpart.addparam('parttype', exc.parttype)
866 errpart.addparam('parttype', exc.parttype)
864 if exc.params:
867 if exc.params:
865 errpart.addparam('params', '\0'.join(exc.params))
868 errpart.addparam('params', '\0'.join(exc.params))
866 except error.Abort as exc:
869 except error.Abort as exc:
867 manargs = [('message', str(exc))]
870 manargs = [('message', str(exc))]
868 advargs = []
871 advargs = []
869 if exc.hint is not None:
872 if exc.hint is not None:
870 advargs.append(('hint', exc.hint))
873 advargs.append(('hint', exc.hint))
871 bundler.addpart(bundle2.bundlepart('error:abort',
874 bundler.addpart(bundle2.bundlepart('error:abort',
872 manargs, advargs))
875 manargs, advargs))
873 except error.PushRaced as exc:
876 except error.PushRaced as exc:
874 bundler.newpart('error:pushraced', [('message', str(exc))])
877 bundler.newpart('error:pushraced', [('message', str(exc))])
875 return streamres(bundler.getchunks())
878 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now