##// END OF EJS Templates
stream: sort stream capability before serialisation...
Pierre-Yves David -
r26911:d7e5e4da default
parent child Browse files
Show More
@@ -1,827 +1,827 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 from __future__ import absolute_import
8 from __future__ import absolute_import
9
9
10 import os
10 import os
11 import sys
11 import sys
12 import tempfile
12 import tempfile
13 import urllib
13 import urllib
14
14
15 from .i18n import _
15 from .i18n import _
16 from .node import (
16 from .node import (
17 bin,
17 bin,
18 hex,
18 hex,
19 )
19 )
20
20
21 from . import (
21 from . import (
22 bundle2,
22 bundle2,
23 changegroup as changegroupmod,
23 changegroup as changegroupmod,
24 encoding,
24 encoding,
25 error,
25 error,
26 exchange,
26 exchange,
27 peer,
27 peer,
28 pushkey as pushkeymod,
28 pushkey as pushkeymod,
29 streamclone,
29 streamclone,
30 util,
30 util,
31 )
31 )
32
32
33 class abstractserverproto(object):
33 class abstractserverproto(object):
34 """abstract class that summarizes the protocol API
34 """abstract class that summarizes the protocol API
35
35
36 Used as reference and documentation.
36 Used as reference and documentation.
37 """
37 """
38
38
39 def getargs(self, args):
39 def getargs(self, args):
40 """return the value for arguments in <args>
40 """return the value for arguments in <args>
41
41
42 returns a list of values (same order as <args>)"""
42 returns a list of values (same order as <args>)"""
43 raise NotImplementedError()
43 raise NotImplementedError()
44
44
45 def getfile(self, fp):
45 def getfile(self, fp):
46 """write the whole content of a file into a file like object
46 """write the whole content of a file into a file like object
47
47
48 The file is in the form::
48 The file is in the form::
49
49
50 (<chunk-size>\n<chunk>)+0\n
50 (<chunk-size>\n<chunk>)+0\n
51
51
52 chunk size is the ascii version of the int.
52 chunk size is the ascii version of the int.
53 """
53 """
54 raise NotImplementedError()
54 raise NotImplementedError()
55
55
56 def redirect(self):
56 def redirect(self):
57 """may setup interception for stdout and stderr
57 """may setup interception for stdout and stderr
58
58
59 See also the `restore` method."""
59 See also the `restore` method."""
60 raise NotImplementedError()
60 raise NotImplementedError()
61
61
62 # If the `redirect` function does install interception, the `restore`
62 # If the `redirect` function does install interception, the `restore`
63 # function MUST be defined. If interception is not used, this function
63 # function MUST be defined. If interception is not used, this function
64 # MUST NOT be defined.
64 # MUST NOT be defined.
65 #
65 #
66 # left commented here on purpose
66 # left commented here on purpose
67 #
67 #
68 #def restore(self):
68 #def restore(self):
69 # """reinstall previous stdout and stderr and return intercepted stdout
69 # """reinstall previous stdout and stderr and return intercepted stdout
70 # """
70 # """
71 # raise NotImplementedError()
71 # raise NotImplementedError()
72
72
73 def groupchunks(self, cg):
73 def groupchunks(self, cg):
74 """return 4096 chunks from a changegroup object
74 """return 4096 chunks from a changegroup object
75
75
76 Some protocols may have compressed the contents."""
76 Some protocols may have compressed the contents."""
77 raise NotImplementedError()
77 raise NotImplementedError()
78
78
79 class remotebatch(peer.batcher):
79 class remotebatch(peer.batcher):
80 '''batches the queued calls; uses as few roundtrips as possible'''
80 '''batches the queued calls; uses as few roundtrips as possible'''
81 def __init__(self, remote):
81 def __init__(self, remote):
82 '''remote must support _submitbatch(encbatch) and
82 '''remote must support _submitbatch(encbatch) and
83 _submitone(op, encargs)'''
83 _submitone(op, encargs)'''
84 peer.batcher.__init__(self)
84 peer.batcher.__init__(self)
85 self.remote = remote
85 self.remote = remote
86 def submit(self):
86 def submit(self):
87 req, rsp = [], []
87 req, rsp = [], []
88 for name, args, opts, resref in self.calls:
88 for name, args, opts, resref in self.calls:
89 mtd = getattr(self.remote, name)
89 mtd = getattr(self.remote, name)
90 batchablefn = getattr(mtd, 'batchable', None)
90 batchablefn = getattr(mtd, 'batchable', None)
91 if batchablefn is not None:
91 if batchablefn is not None:
92 batchable = batchablefn(mtd.im_self, *args, **opts)
92 batchable = batchablefn(mtd.im_self, *args, **opts)
93 encargsorres, encresref = batchable.next()
93 encargsorres, encresref = batchable.next()
94 if encresref:
94 if encresref:
95 req.append((name, encargsorres,))
95 req.append((name, encargsorres,))
96 rsp.append((batchable, encresref, resref,))
96 rsp.append((batchable, encresref, resref,))
97 else:
97 else:
98 resref.set(encargsorres)
98 resref.set(encargsorres)
99 else:
99 else:
100 if req:
100 if req:
101 self._submitreq(req, rsp)
101 self._submitreq(req, rsp)
102 req, rsp = [], []
102 req, rsp = [], []
103 resref.set(mtd(*args, **opts))
103 resref.set(mtd(*args, **opts))
104 if req:
104 if req:
105 self._submitreq(req, rsp)
105 self._submitreq(req, rsp)
106 def _submitreq(self, req, rsp):
106 def _submitreq(self, req, rsp):
107 encresults = self.remote._submitbatch(req)
107 encresults = self.remote._submitbatch(req)
108 for encres, r in zip(encresults, rsp):
108 for encres, r in zip(encresults, rsp):
109 batchable, encresref, resref = r
109 batchable, encresref, resref = r
110 encresref.set(encres)
110 encresref.set(encres)
111 resref.set(batchable.next())
111 resref.set(batchable.next())
112
112
113 # Forward a couple of names from peer to make wireproto interactions
113 # Forward a couple of names from peer to make wireproto interactions
114 # slightly more sensible.
114 # slightly more sensible.
115 batchable = peer.batchable
115 batchable = peer.batchable
116 future = peer.future
116 future = peer.future
117
117
118 # list of nodes encoding / decoding
118 # list of nodes encoding / decoding
119
119
120 def decodelist(l, sep=' '):
120 def decodelist(l, sep=' '):
121 if l:
121 if l:
122 return map(bin, l.split(sep))
122 return map(bin, l.split(sep))
123 return []
123 return []
124
124
125 def encodelist(l, sep=' '):
125 def encodelist(l, sep=' '):
126 try:
126 try:
127 return sep.join(map(hex, l))
127 return sep.join(map(hex, l))
128 except TypeError:
128 except TypeError:
129 raise
129 raise
130
130
131 # batched call argument encoding
131 # batched call argument encoding
132
132
133 def escapearg(plain):
133 def escapearg(plain):
134 return (plain
134 return (plain
135 .replace(':', ':c')
135 .replace(':', ':c')
136 .replace(',', ':o')
136 .replace(',', ':o')
137 .replace(';', ':s')
137 .replace(';', ':s')
138 .replace('=', ':e'))
138 .replace('=', ':e'))
139
139
140 def unescapearg(escaped):
140 def unescapearg(escaped):
141 return (escaped
141 return (escaped
142 .replace(':e', '=')
142 .replace(':e', '=')
143 .replace(':s', ';')
143 .replace(':s', ';')
144 .replace(':o', ',')
144 .replace(':o', ',')
145 .replace(':c', ':'))
145 .replace(':c', ':'))
146
146
147 # mapping of options accepted by getbundle and their types
147 # mapping of options accepted by getbundle and their types
148 #
148 #
149 # Meant to be extended by extensions. It is extensions responsibility to ensure
149 # Meant to be extended by extensions. It is extensions responsibility to ensure
150 # such options are properly processed in exchange.getbundle.
150 # such options are properly processed in exchange.getbundle.
151 #
151 #
152 # supported types are:
152 # supported types are:
153 #
153 #
154 # :nodes: list of binary nodes
154 # :nodes: list of binary nodes
155 # :csv: list of comma-separated values
155 # :csv: list of comma-separated values
156 # :scsv: list of comma-separated values return as set
156 # :scsv: list of comma-separated values return as set
157 # :plain: string with no transformation needed.
157 # :plain: string with no transformation needed.
158 gboptsmap = {'heads': 'nodes',
158 gboptsmap = {'heads': 'nodes',
159 'common': 'nodes',
159 'common': 'nodes',
160 'obsmarkers': 'boolean',
160 'obsmarkers': 'boolean',
161 'bundlecaps': 'scsv',
161 'bundlecaps': 'scsv',
162 'listkeys': 'csv',
162 'listkeys': 'csv',
163 'cg': 'boolean',
163 'cg': 'boolean',
164 'cbattempted': 'boolean'}
164 'cbattempted': 'boolean'}
165
165
166 # client side
166 # client side
167
167
168 class wirepeer(peer.peerrepository):
168 class wirepeer(peer.peerrepository):
169
169
170 def batch(self):
170 def batch(self):
171 if self.capable('batch'):
171 if self.capable('batch'):
172 return remotebatch(self)
172 return remotebatch(self)
173 else:
173 else:
174 return peer.localbatch(self)
174 return peer.localbatch(self)
175 def _submitbatch(self, req):
175 def _submitbatch(self, req):
176 cmds = []
176 cmds = []
177 for op, argsdict in req:
177 for op, argsdict in req:
178 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
178 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
179 for k, v in argsdict.iteritems())
179 for k, v in argsdict.iteritems())
180 cmds.append('%s %s' % (op, args))
180 cmds.append('%s %s' % (op, args))
181 rsp = self._call("batch", cmds=';'.join(cmds))
181 rsp = self._call("batch", cmds=';'.join(cmds))
182 return [unescapearg(r) for r in rsp.split(';')]
182 return [unescapearg(r) for r in rsp.split(';')]
183 def _submitone(self, op, args):
183 def _submitone(self, op, args):
184 return self._call(op, **args)
184 return self._call(op, **args)
185
185
186 @batchable
186 @batchable
187 def lookup(self, key):
187 def lookup(self, key):
188 self.requirecap('lookup', _('look up remote revision'))
188 self.requirecap('lookup', _('look up remote revision'))
189 f = future()
189 f = future()
190 yield {'key': encoding.fromlocal(key)}, f
190 yield {'key': encoding.fromlocal(key)}, f
191 d = f.value
191 d = f.value
192 success, data = d[:-1].split(" ", 1)
192 success, data = d[:-1].split(" ", 1)
193 if int(success):
193 if int(success):
194 yield bin(data)
194 yield bin(data)
195 self._abort(error.RepoError(data))
195 self._abort(error.RepoError(data))
196
196
197 @batchable
197 @batchable
198 def heads(self):
198 def heads(self):
199 f = future()
199 f = future()
200 yield {}, f
200 yield {}, f
201 d = f.value
201 d = f.value
202 try:
202 try:
203 yield decodelist(d[:-1])
203 yield decodelist(d[:-1])
204 except ValueError:
204 except ValueError:
205 self._abort(error.ResponseError(_("unexpected response:"), d))
205 self._abort(error.ResponseError(_("unexpected response:"), d))
206
206
207 @batchable
207 @batchable
208 def known(self, nodes):
208 def known(self, nodes):
209 f = future()
209 f = future()
210 yield {'nodes': encodelist(nodes)}, f
210 yield {'nodes': encodelist(nodes)}, f
211 d = f.value
211 d = f.value
212 try:
212 try:
213 yield [bool(int(b)) for b in d]
213 yield [bool(int(b)) for b in d]
214 except ValueError:
214 except ValueError:
215 self._abort(error.ResponseError(_("unexpected response:"), d))
215 self._abort(error.ResponseError(_("unexpected response:"), d))
216
216
217 @batchable
217 @batchable
218 def branchmap(self):
218 def branchmap(self):
219 f = future()
219 f = future()
220 yield {}, f
220 yield {}, f
221 d = f.value
221 d = f.value
222 try:
222 try:
223 branchmap = {}
223 branchmap = {}
224 for branchpart in d.splitlines():
224 for branchpart in d.splitlines():
225 branchname, branchheads = branchpart.split(' ', 1)
225 branchname, branchheads = branchpart.split(' ', 1)
226 branchname = encoding.tolocal(urllib.unquote(branchname))
226 branchname = encoding.tolocal(urllib.unquote(branchname))
227 branchheads = decodelist(branchheads)
227 branchheads = decodelist(branchheads)
228 branchmap[branchname] = branchheads
228 branchmap[branchname] = branchheads
229 yield branchmap
229 yield branchmap
230 except TypeError:
230 except TypeError:
231 self._abort(error.ResponseError(_("unexpected response:"), d))
231 self._abort(error.ResponseError(_("unexpected response:"), d))
232
232
233 def branches(self, nodes):
233 def branches(self, nodes):
234 n = encodelist(nodes)
234 n = encodelist(nodes)
235 d = self._call("branches", nodes=n)
235 d = self._call("branches", nodes=n)
236 try:
236 try:
237 br = [tuple(decodelist(b)) for b in d.splitlines()]
237 br = [tuple(decodelist(b)) for b in d.splitlines()]
238 return br
238 return br
239 except ValueError:
239 except ValueError:
240 self._abort(error.ResponseError(_("unexpected response:"), d))
240 self._abort(error.ResponseError(_("unexpected response:"), d))
241
241
242 def between(self, pairs):
242 def between(self, pairs):
243 batch = 8 # avoid giant requests
243 batch = 8 # avoid giant requests
244 r = []
244 r = []
245 for i in xrange(0, len(pairs), batch):
245 for i in xrange(0, len(pairs), batch):
246 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
246 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
247 d = self._call("between", pairs=n)
247 d = self._call("between", pairs=n)
248 try:
248 try:
249 r.extend(l and decodelist(l) or [] for l in d.splitlines())
249 r.extend(l and decodelist(l) or [] for l in d.splitlines())
250 except ValueError:
250 except ValueError:
251 self._abort(error.ResponseError(_("unexpected response:"), d))
251 self._abort(error.ResponseError(_("unexpected response:"), d))
252 return r
252 return r
253
253
254 @batchable
254 @batchable
255 def pushkey(self, namespace, key, old, new):
255 def pushkey(self, namespace, key, old, new):
256 if not self.capable('pushkey'):
256 if not self.capable('pushkey'):
257 yield False, None
257 yield False, None
258 f = future()
258 f = future()
259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
259 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
260 yield {'namespace': encoding.fromlocal(namespace),
260 yield {'namespace': encoding.fromlocal(namespace),
261 'key': encoding.fromlocal(key),
261 'key': encoding.fromlocal(key),
262 'old': encoding.fromlocal(old),
262 'old': encoding.fromlocal(old),
263 'new': encoding.fromlocal(new)}, f
263 'new': encoding.fromlocal(new)}, f
264 d = f.value
264 d = f.value
265 d, output = d.split('\n', 1)
265 d, output = d.split('\n', 1)
266 try:
266 try:
267 d = bool(int(d))
267 d = bool(int(d))
268 except ValueError:
268 except ValueError:
269 raise error.ResponseError(
269 raise error.ResponseError(
270 _('push failed (unexpected response):'), d)
270 _('push failed (unexpected response):'), d)
271 for l in output.splitlines(True):
271 for l in output.splitlines(True):
272 self.ui.status(_('remote: '), l)
272 self.ui.status(_('remote: '), l)
273 yield d
273 yield d
274
274
275 @batchable
275 @batchable
276 def listkeys(self, namespace):
276 def listkeys(self, namespace):
277 if not self.capable('pushkey'):
277 if not self.capable('pushkey'):
278 yield {}, None
278 yield {}, None
279 f = future()
279 f = future()
280 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
280 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
281 yield {'namespace': encoding.fromlocal(namespace)}, f
281 yield {'namespace': encoding.fromlocal(namespace)}, f
282 d = f.value
282 d = f.value
283 self.ui.debug('received listkey for "%s": %i bytes\n'
283 self.ui.debug('received listkey for "%s": %i bytes\n'
284 % (namespace, len(d)))
284 % (namespace, len(d)))
285 yield pushkeymod.decodekeys(d)
285 yield pushkeymod.decodekeys(d)
286
286
287 def stream_out(self):
287 def stream_out(self):
288 return self._callstream('stream_out')
288 return self._callstream('stream_out')
289
289
290 def changegroup(self, nodes, kind):
290 def changegroup(self, nodes, kind):
291 n = encodelist(nodes)
291 n = encodelist(nodes)
292 f = self._callcompressable("changegroup", roots=n)
292 f = self._callcompressable("changegroup", roots=n)
293 return changegroupmod.cg1unpacker(f, 'UN')
293 return changegroupmod.cg1unpacker(f, 'UN')
294
294
295 def changegroupsubset(self, bases, heads, kind):
295 def changegroupsubset(self, bases, heads, kind):
296 self.requirecap('changegroupsubset', _('look up remote changes'))
296 self.requirecap('changegroupsubset', _('look up remote changes'))
297 bases = encodelist(bases)
297 bases = encodelist(bases)
298 heads = encodelist(heads)
298 heads = encodelist(heads)
299 f = self._callcompressable("changegroupsubset",
299 f = self._callcompressable("changegroupsubset",
300 bases=bases, heads=heads)
300 bases=bases, heads=heads)
301 return changegroupmod.cg1unpacker(f, 'UN')
301 return changegroupmod.cg1unpacker(f, 'UN')
302
302
303 def getbundle(self, source, **kwargs):
303 def getbundle(self, source, **kwargs):
304 self.requirecap('getbundle', _('look up remote changes'))
304 self.requirecap('getbundle', _('look up remote changes'))
305 opts = {}
305 opts = {}
306 bundlecaps = kwargs.get('bundlecaps')
306 bundlecaps = kwargs.get('bundlecaps')
307 if bundlecaps is not None:
307 if bundlecaps is not None:
308 kwargs['bundlecaps'] = sorted(bundlecaps)
308 kwargs['bundlecaps'] = sorted(bundlecaps)
309 else:
309 else:
310 bundlecaps = () # kwargs could have it to None
310 bundlecaps = () # kwargs could have it to None
311 for key, value in kwargs.iteritems():
311 for key, value in kwargs.iteritems():
312 if value is None:
312 if value is None:
313 continue
313 continue
314 keytype = gboptsmap.get(key)
314 keytype = gboptsmap.get(key)
315 if keytype is None:
315 if keytype is None:
316 assert False, 'unexpected'
316 assert False, 'unexpected'
317 elif keytype == 'nodes':
317 elif keytype == 'nodes':
318 value = encodelist(value)
318 value = encodelist(value)
319 elif keytype in ('csv', 'scsv'):
319 elif keytype in ('csv', 'scsv'):
320 value = ','.join(value)
320 value = ','.join(value)
321 elif keytype == 'boolean':
321 elif keytype == 'boolean':
322 value = '%i' % bool(value)
322 value = '%i' % bool(value)
323 elif keytype != 'plain':
323 elif keytype != 'plain':
324 raise KeyError('unknown getbundle option type %s'
324 raise KeyError('unknown getbundle option type %s'
325 % keytype)
325 % keytype)
326 opts[key] = value
326 opts[key] = value
327 f = self._callcompressable("getbundle", **opts)
327 f = self._callcompressable("getbundle", **opts)
328 if any((cap.startswith('HG2') for cap in bundlecaps)):
328 if any((cap.startswith('HG2') for cap in bundlecaps)):
329 return bundle2.getunbundler(self.ui, f)
329 return bundle2.getunbundler(self.ui, f)
330 else:
330 else:
331 return changegroupmod.cg1unpacker(f, 'UN')
331 return changegroupmod.cg1unpacker(f, 'UN')
332
332
333 def unbundle(self, cg, heads, source):
333 def unbundle(self, cg, heads, source):
334 '''Send cg (a readable file-like object representing the
334 '''Send cg (a readable file-like object representing the
335 changegroup to push, typically a chunkbuffer object) to the
335 changegroup to push, typically a chunkbuffer object) to the
336 remote server as a bundle.
336 remote server as a bundle.
337
337
338 When pushing a bundle10 stream, return an integer indicating the
338 When pushing a bundle10 stream, return an integer indicating the
339 result of the push (see localrepository.addchangegroup()).
339 result of the push (see localrepository.addchangegroup()).
340
340
341 When pushing a bundle20 stream, return a bundle20 stream.'''
341 When pushing a bundle20 stream, return a bundle20 stream.'''
342
342
343 if heads != ['force'] and self.capable('unbundlehash'):
343 if heads != ['force'] and self.capable('unbundlehash'):
344 heads = encodelist(['hashed',
344 heads = encodelist(['hashed',
345 util.sha1(''.join(sorted(heads))).digest()])
345 util.sha1(''.join(sorted(heads))).digest()])
346 else:
346 else:
347 heads = encodelist(heads)
347 heads = encodelist(heads)
348
348
349 if util.safehasattr(cg, 'deltaheader'):
349 if util.safehasattr(cg, 'deltaheader'):
350 # this a bundle10, do the old style call sequence
350 # this a bundle10, do the old style call sequence
351 ret, output = self._callpush("unbundle", cg, heads=heads)
351 ret, output = self._callpush("unbundle", cg, heads=heads)
352 if ret == "":
352 if ret == "":
353 raise error.ResponseError(
353 raise error.ResponseError(
354 _('push failed:'), output)
354 _('push failed:'), output)
355 try:
355 try:
356 ret = int(ret)
356 ret = int(ret)
357 except ValueError:
357 except ValueError:
358 raise error.ResponseError(
358 raise error.ResponseError(
359 _('push failed (unexpected response):'), ret)
359 _('push failed (unexpected response):'), ret)
360
360
361 for l in output.splitlines(True):
361 for l in output.splitlines(True):
362 self.ui.status(_('remote: '), l)
362 self.ui.status(_('remote: '), l)
363 else:
363 else:
364 # bundle2 push. Send a stream, fetch a stream.
364 # bundle2 push. Send a stream, fetch a stream.
365 stream = self._calltwowaystream('unbundle', cg, heads=heads)
365 stream = self._calltwowaystream('unbundle', cg, heads=heads)
366 ret = bundle2.getunbundler(self.ui, stream)
366 ret = bundle2.getunbundler(self.ui, stream)
367 return ret
367 return ret
368
368
369 def debugwireargs(self, one, two, three=None, four=None, five=None):
369 def debugwireargs(self, one, two, three=None, four=None, five=None):
370 # don't pass optional arguments left at their default value
370 # don't pass optional arguments left at their default value
371 opts = {}
371 opts = {}
372 if three is not None:
372 if three is not None:
373 opts['three'] = three
373 opts['three'] = three
374 if four is not None:
374 if four is not None:
375 opts['four'] = four
375 opts['four'] = four
376 return self._call('debugwireargs', one=one, two=two, **opts)
376 return self._call('debugwireargs', one=one, two=two, **opts)
377
377
378 def _call(self, cmd, **args):
378 def _call(self, cmd, **args):
379 """execute <cmd> on the server
379 """execute <cmd> on the server
380
380
381 The command is expected to return a simple string.
381 The command is expected to return a simple string.
382
382
383 returns the server reply as a string."""
383 returns the server reply as a string."""
384 raise NotImplementedError()
384 raise NotImplementedError()
385
385
386 def _callstream(self, cmd, **args):
386 def _callstream(self, cmd, **args):
387 """execute <cmd> on the server
387 """execute <cmd> on the server
388
388
389 The command is expected to return a stream.
389 The command is expected to return a stream.
390
390
391 returns the server reply as a file like object."""
391 returns the server reply as a file like object."""
392 raise NotImplementedError()
392 raise NotImplementedError()
393
393
394 def _callcompressable(self, cmd, **args):
394 def _callcompressable(self, cmd, **args):
395 """execute <cmd> on the server
395 """execute <cmd> on the server
396
396
397 The command is expected to return a stream.
397 The command is expected to return a stream.
398
398
399 The stream may have been compressed in some implementations. This
399 The stream may have been compressed in some implementations. This
400 function takes care of the decompression. This is the only difference
400 function takes care of the decompression. This is the only difference
401 with _callstream.
401 with _callstream.
402
402
403 returns the server reply as a file like object.
403 returns the server reply as a file like object.
404 """
404 """
405 raise NotImplementedError()
405 raise NotImplementedError()
406
406
407 def _callpush(self, cmd, fp, **args):
407 def _callpush(self, cmd, fp, **args):
408 """execute a <cmd> on server
408 """execute a <cmd> on server
409
409
410 The command is expected to be related to a push. Push has a special
410 The command is expected to be related to a push. Push has a special
411 return method.
411 return method.
412
412
413 returns the server reply as a (ret, output) tuple. ret is either
413 returns the server reply as a (ret, output) tuple. ret is either
414 empty (error) or a stringified int.
414 empty (error) or a stringified int.
415 """
415 """
416 raise NotImplementedError()
416 raise NotImplementedError()
417
417
418 def _calltwowaystream(self, cmd, fp, **args):
418 def _calltwowaystream(self, cmd, fp, **args):
419 """execute <cmd> on server
419 """execute <cmd> on server
420
420
421 The command will send a stream to the server and get a stream in reply.
421 The command will send a stream to the server and get a stream in reply.
422 """
422 """
423 raise NotImplementedError()
423 raise NotImplementedError()
424
424
425 def _abort(self, exception):
425 def _abort(self, exception):
426 """clearly abort the wire protocol connection and raise the exception
426 """clearly abort the wire protocol connection and raise the exception
427 """
427 """
428 raise NotImplementedError()
428 raise NotImplementedError()
429
429
430 # server side
430 # server side
431
431
432 # wire protocol command can either return a string or one of these classes.
432 # wire protocol command can either return a string or one of these classes.
433 class streamres(object):
433 class streamres(object):
434 """wireproto reply: binary stream
434 """wireproto reply: binary stream
435
435
436 The call was successful and the result is a stream.
436 The call was successful and the result is a stream.
437 Iterate on the `self.gen` attribute to retrieve chunks.
437 Iterate on the `self.gen` attribute to retrieve chunks.
438 """
438 """
439 def __init__(self, gen):
439 def __init__(self, gen):
440 self.gen = gen
440 self.gen = gen
441
441
442 class pushres(object):
442 class pushres(object):
443 """wireproto reply: success with simple integer return
443 """wireproto reply: success with simple integer return
444
444
445 The call was successful and returned an integer contained in `self.res`.
445 The call was successful and returned an integer contained in `self.res`.
446 """
446 """
447 def __init__(self, res):
447 def __init__(self, res):
448 self.res = res
448 self.res = res
449
449
450 class pusherr(object):
450 class pusherr(object):
451 """wireproto reply: failure
451 """wireproto reply: failure
452
452
453 The call failed. The `self.res` attribute contains the error message.
453 The call failed. The `self.res` attribute contains the error message.
454 """
454 """
455 def __init__(self, res):
455 def __init__(self, res):
456 self.res = res
456 self.res = res
457
457
458 class ooberror(object):
458 class ooberror(object):
459 """wireproto reply: failure of a batch of operation
459 """wireproto reply: failure of a batch of operation
460
460
461 Something failed during a batch call. The error message is stored in
461 Something failed during a batch call. The error message is stored in
462 `self.message`.
462 `self.message`.
463 """
463 """
464 def __init__(self, message):
464 def __init__(self, message):
465 self.message = message
465 self.message = message
466
466
467 def dispatch(repo, proto, command):
467 def dispatch(repo, proto, command):
468 repo = repo.filtered("served")
468 repo = repo.filtered("served")
469 func, spec = commands[command]
469 func, spec = commands[command]
470 args = proto.getargs(spec)
470 args = proto.getargs(spec)
471 return func(repo, proto, *args)
471 return func(repo, proto, *args)
472
472
473 def options(cmd, keys, others):
473 def options(cmd, keys, others):
474 opts = {}
474 opts = {}
475 for k in keys:
475 for k in keys:
476 if k in others:
476 if k in others:
477 opts[k] = others[k]
477 opts[k] = others[k]
478 del others[k]
478 del others[k]
479 if others:
479 if others:
480 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
480 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
481 % (cmd, ",".join(others)))
481 % (cmd, ",".join(others)))
482 return opts
482 return opts
483
483
484 # list of commands
484 # list of commands
485 commands = {}
485 commands = {}
486
486
487 def wireprotocommand(name, args=''):
487 def wireprotocommand(name, args=''):
488 """decorator for wire protocol command"""
488 """decorator for wire protocol command"""
489 def register(func):
489 def register(func):
490 commands[name] = (func, args)
490 commands[name] = (func, args)
491 return func
491 return func
492 return register
492 return register
493
493
494 @wireprotocommand('batch', 'cmds *')
494 @wireprotocommand('batch', 'cmds *')
495 def batch(repo, proto, cmds, others):
495 def batch(repo, proto, cmds, others):
496 repo = repo.filtered("served")
496 repo = repo.filtered("served")
497 res = []
497 res = []
498 for pair in cmds.split(';'):
498 for pair in cmds.split(';'):
499 op, args = pair.split(' ', 1)
499 op, args = pair.split(' ', 1)
500 vals = {}
500 vals = {}
501 for a in args.split(','):
501 for a in args.split(','):
502 if a:
502 if a:
503 n, v = a.split('=')
503 n, v = a.split('=')
504 vals[n] = unescapearg(v)
504 vals[n] = unescapearg(v)
505 func, spec = commands[op]
505 func, spec = commands[op]
506 if spec:
506 if spec:
507 keys = spec.split()
507 keys = spec.split()
508 data = {}
508 data = {}
509 for k in keys:
509 for k in keys:
510 if k == '*':
510 if k == '*':
511 star = {}
511 star = {}
512 for key in vals.keys():
512 for key in vals.keys():
513 if key not in keys:
513 if key not in keys:
514 star[key] = vals[key]
514 star[key] = vals[key]
515 data['*'] = star
515 data['*'] = star
516 else:
516 else:
517 data[k] = vals[k]
517 data[k] = vals[k]
518 result = func(repo, proto, *[data[k] for k in keys])
518 result = func(repo, proto, *[data[k] for k in keys])
519 else:
519 else:
520 result = func(repo, proto)
520 result = func(repo, proto)
521 if isinstance(result, ooberror):
521 if isinstance(result, ooberror):
522 return result
522 return result
523 res.append(escapearg(result))
523 res.append(escapearg(result))
524 return ';'.join(res)
524 return ';'.join(res)
525
525
526 @wireprotocommand('between', 'pairs')
526 @wireprotocommand('between', 'pairs')
527 def between(repo, proto, pairs):
527 def between(repo, proto, pairs):
528 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
528 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
529 r = []
529 r = []
530 for b in repo.between(pairs):
530 for b in repo.between(pairs):
531 r.append(encodelist(b) + "\n")
531 r.append(encodelist(b) + "\n")
532 return "".join(r)
532 return "".join(r)
533
533
534 @wireprotocommand('branchmap')
534 @wireprotocommand('branchmap')
535 def branchmap(repo, proto):
535 def branchmap(repo, proto):
536 branchmap = repo.branchmap()
536 branchmap = repo.branchmap()
537 heads = []
537 heads = []
538 for branch, nodes in branchmap.iteritems():
538 for branch, nodes in branchmap.iteritems():
539 branchname = urllib.quote(encoding.fromlocal(branch))
539 branchname = urllib.quote(encoding.fromlocal(branch))
540 branchnodes = encodelist(nodes)
540 branchnodes = encodelist(nodes)
541 heads.append('%s %s' % (branchname, branchnodes))
541 heads.append('%s %s' % (branchname, branchnodes))
542 return '\n'.join(heads)
542 return '\n'.join(heads)
543
543
544 @wireprotocommand('branches', 'nodes')
544 @wireprotocommand('branches', 'nodes')
545 def branches(repo, proto, nodes):
545 def branches(repo, proto, nodes):
546 nodes = decodelist(nodes)
546 nodes = decodelist(nodes)
547 r = []
547 r = []
548 for b in repo.branches(nodes):
548 for b in repo.branches(nodes):
549 r.append(encodelist(b) + "\n")
549 r.append(encodelist(b) + "\n")
550 return "".join(r)
550 return "".join(r)
551
551
552 @wireprotocommand('clonebundles', '')
552 @wireprotocommand('clonebundles', '')
553 def clonebundles(repo, proto):
553 def clonebundles(repo, proto):
554 """Server command for returning info for available bundles to seed clones.
554 """Server command for returning info for available bundles to seed clones.
555
555
556 Clients will parse this response and determine what bundle to fetch.
556 Clients will parse this response and determine what bundle to fetch.
557
557
558 Extensions may wrap this command to filter or dynamically emit data
558 Extensions may wrap this command to filter or dynamically emit data
559 depending on the request. e.g. you could advertise URLs for the closest
559 depending on the request. e.g. you could advertise URLs for the closest
560 data center given the client's IP address.
560 data center given the client's IP address.
561 """
561 """
562 return repo.opener.tryread('clonebundles.manifest')
562 return repo.opener.tryread('clonebundles.manifest')
563
563
564 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
564 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
565 'known', 'getbundle', 'unbundlehash', 'batch']
565 'known', 'getbundle', 'unbundlehash', 'batch']
566
566
567 def _capabilities(repo, proto):
567 def _capabilities(repo, proto):
568 """return a list of capabilities for a repo
568 """return a list of capabilities for a repo
569
569
570 This function exists to allow extensions to easily wrap capabilities
570 This function exists to allow extensions to easily wrap capabilities
571 computation
571 computation
572
572
573 - returns a lists: easy to alter
573 - returns a lists: easy to alter
574 - change done here will be propagated to both `capabilities` and `hello`
574 - change done here will be propagated to both `capabilities` and `hello`
575 command without any other action needed.
575 command without any other action needed.
576 """
576 """
577 # copy to prevent modification of the global list
577 # copy to prevent modification of the global list
578 caps = list(wireprotocaps)
578 caps = list(wireprotocaps)
579 if streamclone.allowservergeneration(repo.ui):
579 if streamclone.allowservergeneration(repo.ui):
580 if repo.ui.configbool('server', 'preferuncompressed', False):
580 if repo.ui.configbool('server', 'preferuncompressed', False):
581 caps.append('stream-preferred')
581 caps.append('stream-preferred')
582 requiredformats = repo.requirements & repo.supportedformats
582 requiredformats = repo.requirements & repo.supportedformats
583 # if our local revlogs are just revlogv1, add 'stream' cap
583 # if our local revlogs are just revlogv1, add 'stream' cap
584 if not requiredformats - set(('revlogv1',)):
584 if not requiredformats - set(('revlogv1',)):
585 caps.append('stream')
585 caps.append('stream')
586 # otherwise, add 'streamreqs' detailing our local revlog format
586 # otherwise, add 'streamreqs' detailing our local revlog format
587 else:
587 else:
588 caps.append('streamreqs=%s' % ','.join(requiredformats))
588 caps.append('streamreqs=%s' % ','.join(sorted(requiredformats)))
589 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
589 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
590 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
590 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
591 caps.append('bundle2=' + urllib.quote(capsblob))
591 caps.append('bundle2=' + urllib.quote(capsblob))
592 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
592 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
593 caps.append(
593 caps.append(
594 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
594 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
595 return caps
595 return caps
596
596
597 # If you are writing an extension and consider wrapping this function. Wrap
597 # If you are writing an extension and consider wrapping this function. Wrap
598 # `_capabilities` instead.
598 # `_capabilities` instead.
599 @wireprotocommand('capabilities')
599 @wireprotocommand('capabilities')
600 def capabilities(repo, proto):
600 def capabilities(repo, proto):
601 return ' '.join(_capabilities(repo, proto))
601 return ' '.join(_capabilities(repo, proto))
602
602
603 @wireprotocommand('changegroup', 'roots')
603 @wireprotocommand('changegroup', 'roots')
604 def changegroup(repo, proto, roots):
604 def changegroup(repo, proto, roots):
605 nodes = decodelist(roots)
605 nodes = decodelist(roots)
606 cg = changegroupmod.changegroup(repo, nodes, 'serve')
606 cg = changegroupmod.changegroup(repo, nodes, 'serve')
607 return streamres(proto.groupchunks(cg))
607 return streamres(proto.groupchunks(cg))
608
608
609 @wireprotocommand('changegroupsubset', 'bases heads')
609 @wireprotocommand('changegroupsubset', 'bases heads')
610 def changegroupsubset(repo, proto, bases, heads):
610 def changegroupsubset(repo, proto, bases, heads):
611 bases = decodelist(bases)
611 bases = decodelist(bases)
612 heads = decodelist(heads)
612 heads = decodelist(heads)
613 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
613 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
614 return streamres(proto.groupchunks(cg))
614 return streamres(proto.groupchunks(cg))
615
615
616 @wireprotocommand('debugwireargs', 'one two *')
616 @wireprotocommand('debugwireargs', 'one two *')
617 def debugwireargs(repo, proto, one, two, others):
617 def debugwireargs(repo, proto, one, two, others):
618 # only accept optional args from the known set
618 # only accept optional args from the known set
619 opts = options('debugwireargs', ['three', 'four'], others)
619 opts = options('debugwireargs', ['three', 'four'], others)
620 return repo.debugwireargs(one, two, **opts)
620 return repo.debugwireargs(one, two, **opts)
621
621
622 # List of options accepted by getbundle.
622 # List of options accepted by getbundle.
623 #
623 #
624 # Meant to be extended by extensions. It is the extension's responsibility to
624 # Meant to be extended by extensions. It is the extension's responsibility to
625 # ensure such options are properly processed in exchange.getbundle.
625 # ensure such options are properly processed in exchange.getbundle.
626 gboptslist = ['heads', 'common', 'bundlecaps']
626 gboptslist = ['heads', 'common', 'bundlecaps']
627
627
628 @wireprotocommand('getbundle', '*')
628 @wireprotocommand('getbundle', '*')
629 def getbundle(repo, proto, others):
629 def getbundle(repo, proto, others):
630 opts = options('getbundle', gboptsmap.keys(), others)
630 opts = options('getbundle', gboptsmap.keys(), others)
631 for k, v in opts.iteritems():
631 for k, v in opts.iteritems():
632 keytype = gboptsmap[k]
632 keytype = gboptsmap[k]
633 if keytype == 'nodes':
633 if keytype == 'nodes':
634 opts[k] = decodelist(v)
634 opts[k] = decodelist(v)
635 elif keytype == 'csv':
635 elif keytype == 'csv':
636 opts[k] = list(v.split(','))
636 opts[k] = list(v.split(','))
637 elif keytype == 'scsv':
637 elif keytype == 'scsv':
638 opts[k] = set(v.split(','))
638 opts[k] = set(v.split(','))
639 elif keytype == 'boolean':
639 elif keytype == 'boolean':
640 # Client should serialize False as '0', which is a non-empty string
640 # Client should serialize False as '0', which is a non-empty string
641 # so it evaluates as a True bool.
641 # so it evaluates as a True bool.
642 if v == '0':
642 if v == '0':
643 opts[k] = False
643 opts[k] = False
644 else:
644 else:
645 opts[k] = bool(v)
645 opts[k] = bool(v)
646 elif keytype != 'plain':
646 elif keytype != 'plain':
647 raise KeyError('unknown getbundle option type %s'
647 raise KeyError('unknown getbundle option type %s'
648 % keytype)
648 % keytype)
649 cg = exchange.getbundle(repo, 'serve', **opts)
649 cg = exchange.getbundle(repo, 'serve', **opts)
650 return streamres(proto.groupchunks(cg))
650 return streamres(proto.groupchunks(cg))
651
651
652 @wireprotocommand('heads')
652 @wireprotocommand('heads')
653 def heads(repo, proto):
653 def heads(repo, proto):
654 h = repo.heads()
654 h = repo.heads()
655 return encodelist(h) + "\n"
655 return encodelist(h) + "\n"
656
656
657 @wireprotocommand('hello')
657 @wireprotocommand('hello')
658 def hello(repo, proto):
658 def hello(repo, proto):
659 '''the hello command returns a set of lines describing various
659 '''the hello command returns a set of lines describing various
660 interesting things about the server, in an RFC822-like format.
660 interesting things about the server, in an RFC822-like format.
661 Currently the only one defined is "capabilities", which
661 Currently the only one defined is "capabilities", which
662 consists of a line in the form:
662 consists of a line in the form:
663
663
664 capabilities: space separated list of tokens
664 capabilities: space separated list of tokens
665 '''
665 '''
666 return "capabilities: %s\n" % (capabilities(repo, proto))
666 return "capabilities: %s\n" % (capabilities(repo, proto))
667
667
668 @wireprotocommand('listkeys', 'namespace')
668 @wireprotocommand('listkeys', 'namespace')
669 def listkeys(repo, proto, namespace):
669 def listkeys(repo, proto, namespace):
670 d = repo.listkeys(encoding.tolocal(namespace)).items()
670 d = repo.listkeys(encoding.tolocal(namespace)).items()
671 return pushkeymod.encodekeys(d)
671 return pushkeymod.encodekeys(d)
672
672
673 @wireprotocommand('lookup', 'key')
673 @wireprotocommand('lookup', 'key')
674 def lookup(repo, proto, key):
674 def lookup(repo, proto, key):
675 try:
675 try:
676 k = encoding.tolocal(key)
676 k = encoding.tolocal(key)
677 c = repo[k]
677 c = repo[k]
678 r = c.hex()
678 r = c.hex()
679 success = 1
679 success = 1
680 except Exception as inst:
680 except Exception as inst:
681 r = str(inst)
681 r = str(inst)
682 success = 0
682 success = 0
683 return "%s %s\n" % (success, r)
683 return "%s %s\n" % (success, r)
684
684
685 @wireprotocommand('known', 'nodes *')
685 @wireprotocommand('known', 'nodes *')
686 def known(repo, proto, nodes, others):
686 def known(repo, proto, nodes, others):
687 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
687 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
688
688
689 @wireprotocommand('pushkey', 'namespace key old new')
689 @wireprotocommand('pushkey', 'namespace key old new')
690 def pushkey(repo, proto, namespace, key, old, new):
690 def pushkey(repo, proto, namespace, key, old, new):
691 # compatibility with pre-1.8 clients which were accidentally
691 # compatibility with pre-1.8 clients which were accidentally
692 # sending raw binary nodes rather than utf-8-encoded hex
692 # sending raw binary nodes rather than utf-8-encoded hex
693 if len(new) == 20 and new.encode('string-escape') != new:
693 if len(new) == 20 and new.encode('string-escape') != new:
694 # looks like it could be a binary node
694 # looks like it could be a binary node
695 try:
695 try:
696 new.decode('utf-8')
696 new.decode('utf-8')
697 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
697 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
698 except UnicodeDecodeError:
698 except UnicodeDecodeError:
699 pass # binary, leave unmodified
699 pass # binary, leave unmodified
700 else:
700 else:
701 new = encoding.tolocal(new) # normal path
701 new = encoding.tolocal(new) # normal path
702
702
703 if util.safehasattr(proto, 'restore'):
703 if util.safehasattr(proto, 'restore'):
704
704
705 proto.redirect()
705 proto.redirect()
706
706
707 try:
707 try:
708 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
708 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
709 encoding.tolocal(old), new) or False
709 encoding.tolocal(old), new) or False
710 except error.Abort:
710 except error.Abort:
711 r = False
711 r = False
712
712
713 output = proto.restore()
713 output = proto.restore()
714
714
715 return '%s\n%s' % (int(r), output)
715 return '%s\n%s' % (int(r), output)
716
716
717 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
717 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
718 encoding.tolocal(old), new)
718 encoding.tolocal(old), new)
719 return '%s\n' % int(r)
719 return '%s\n' % int(r)
720
720
721 @wireprotocommand('stream_out')
721 @wireprotocommand('stream_out')
722 def stream(repo, proto):
722 def stream(repo, proto):
723 '''If the server supports streaming clone, it advertises the "stream"
723 '''If the server supports streaming clone, it advertises the "stream"
724 capability with a value representing the version and flags of the repo
724 capability with a value representing the version and flags of the repo
725 it is serving. Client checks to see if it understands the format.
725 it is serving. Client checks to see if it understands the format.
726 '''
726 '''
727 if not streamclone.allowservergeneration(repo.ui):
727 if not streamclone.allowservergeneration(repo.ui):
728 return '1\n'
728 return '1\n'
729
729
730 def getstream(it):
730 def getstream(it):
731 yield '0\n'
731 yield '0\n'
732 for chunk in it:
732 for chunk in it:
733 yield chunk
733 yield chunk
734
734
735 try:
735 try:
736 # LockError may be raised before the first result is yielded. Don't
736 # LockError may be raised before the first result is yielded. Don't
737 # emit output until we're sure we got the lock successfully.
737 # emit output until we're sure we got the lock successfully.
738 it = streamclone.generatev1wireproto(repo)
738 it = streamclone.generatev1wireproto(repo)
739 return streamres(getstream(it))
739 return streamres(getstream(it))
740 except error.LockError:
740 except error.LockError:
741 return '2\n'
741 return '2\n'
742
742
743 @wireprotocommand('unbundle', 'heads')
743 @wireprotocommand('unbundle', 'heads')
744 def unbundle(repo, proto, heads):
744 def unbundle(repo, proto, heads):
745 their_heads = decodelist(heads)
745 their_heads = decodelist(heads)
746
746
747 try:
747 try:
748 proto.redirect()
748 proto.redirect()
749
749
750 exchange.check_heads(repo, their_heads, 'preparing changes')
750 exchange.check_heads(repo, their_heads, 'preparing changes')
751
751
752 # write bundle data to temporary file because it can be big
752 # write bundle data to temporary file because it can be big
753 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
753 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
754 fp = os.fdopen(fd, 'wb+')
754 fp = os.fdopen(fd, 'wb+')
755 r = 0
755 r = 0
756 try:
756 try:
757 proto.getfile(fp)
757 proto.getfile(fp)
758 fp.seek(0)
758 fp.seek(0)
759 gen = exchange.readbundle(repo.ui, fp, None)
759 gen = exchange.readbundle(repo.ui, fp, None)
760 r = exchange.unbundle(repo, gen, their_heads, 'serve',
760 r = exchange.unbundle(repo, gen, their_heads, 'serve',
761 proto._client())
761 proto._client())
762 if util.safehasattr(r, 'addpart'):
762 if util.safehasattr(r, 'addpart'):
763 # The return looks streamable, we are in the bundle2 case and
763 # The return looks streamable, we are in the bundle2 case and
764 # should return a stream.
764 # should return a stream.
765 return streamres(r.getchunks())
765 return streamres(r.getchunks())
766 return pushres(r)
766 return pushres(r)
767
767
768 finally:
768 finally:
769 fp.close()
769 fp.close()
770 os.unlink(tempname)
770 os.unlink(tempname)
771
771
772 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
772 except (error.BundleValueError, error.Abort, error.PushRaced) as exc:
773 # handle non-bundle2 case first
773 # handle non-bundle2 case first
774 if not getattr(exc, 'duringunbundle2', False):
774 if not getattr(exc, 'duringunbundle2', False):
775 try:
775 try:
776 raise
776 raise
777 except error.Abort:
777 except error.Abort:
778 # The old code we moved used sys.stderr directly.
778 # The old code we moved used sys.stderr directly.
779 # We did not change it to minimise code change.
779 # We did not change it to minimise code change.
780 # This need to be moved to something proper.
780 # This need to be moved to something proper.
781 # Feel free to do it.
781 # Feel free to do it.
782 sys.stderr.write("abort: %s\n" % exc)
782 sys.stderr.write("abort: %s\n" % exc)
783 return pushres(0)
783 return pushres(0)
784 except error.PushRaced:
784 except error.PushRaced:
785 return pusherr(str(exc))
785 return pusherr(str(exc))
786
786
787 bundler = bundle2.bundle20(repo.ui)
787 bundler = bundle2.bundle20(repo.ui)
788 for out in getattr(exc, '_bundle2salvagedoutput', ()):
788 for out in getattr(exc, '_bundle2salvagedoutput', ()):
789 bundler.addpart(out)
789 bundler.addpart(out)
790 try:
790 try:
791 try:
791 try:
792 raise
792 raise
793 except error.PushkeyFailed as exc:
793 except error.PushkeyFailed as exc:
794 # check client caps
794 # check client caps
795 remotecaps = getattr(exc, '_replycaps', None)
795 remotecaps = getattr(exc, '_replycaps', None)
796 if (remotecaps is not None
796 if (remotecaps is not None
797 and 'pushkey' not in remotecaps.get('error', ())):
797 and 'pushkey' not in remotecaps.get('error', ())):
798 # no support remote side, fallback to Abort handler.
798 # no support remote side, fallback to Abort handler.
799 raise
799 raise
800 part = bundler.newpart('error:pushkey')
800 part = bundler.newpart('error:pushkey')
801 part.addparam('in-reply-to', exc.partid)
801 part.addparam('in-reply-to', exc.partid)
802 if exc.namespace is not None:
802 if exc.namespace is not None:
803 part.addparam('namespace', exc.namespace, mandatory=False)
803 part.addparam('namespace', exc.namespace, mandatory=False)
804 if exc.key is not None:
804 if exc.key is not None:
805 part.addparam('key', exc.key, mandatory=False)
805 part.addparam('key', exc.key, mandatory=False)
806 if exc.new is not None:
806 if exc.new is not None:
807 part.addparam('new', exc.new, mandatory=False)
807 part.addparam('new', exc.new, mandatory=False)
808 if exc.old is not None:
808 if exc.old is not None:
809 part.addparam('old', exc.old, mandatory=False)
809 part.addparam('old', exc.old, mandatory=False)
810 if exc.ret is not None:
810 if exc.ret is not None:
811 part.addparam('ret', exc.ret, mandatory=False)
811 part.addparam('ret', exc.ret, mandatory=False)
812 except error.BundleValueError as exc:
812 except error.BundleValueError as exc:
813 errpart = bundler.newpart('error:unsupportedcontent')
813 errpart = bundler.newpart('error:unsupportedcontent')
814 if exc.parttype is not None:
814 if exc.parttype is not None:
815 errpart.addparam('parttype', exc.parttype)
815 errpart.addparam('parttype', exc.parttype)
816 if exc.params:
816 if exc.params:
817 errpart.addparam('params', '\0'.join(exc.params))
817 errpart.addparam('params', '\0'.join(exc.params))
818 except error.Abort as exc:
818 except error.Abort as exc:
819 manargs = [('message', str(exc))]
819 manargs = [('message', str(exc))]
820 advargs = []
820 advargs = []
821 if exc.hint is not None:
821 if exc.hint is not None:
822 advargs.append(('hint', exc.hint))
822 advargs.append(('hint', exc.hint))
823 bundler.addpart(bundle2.bundlepart('error:abort',
823 bundler.addpart(bundle2.bundlepart('error:abort',
824 manargs, advargs))
824 manargs, advargs))
825 except error.PushRaced as exc:
825 except error.PushRaced as exc:
826 bundler.newpart('error:pushraced', [('message', str(exc))])
826 bundler.newpart('error:pushraced', [('message', str(exc))])
827 return streamres(bundler.getchunks())
827 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now