##// END OF EJS Templates
wireproto: make wirepeer look-before-you-leap on batching...
Augie Fackler -
r25913:fa14ba7b default
parent child Browse files
Show More
@@ -1,792 +1,795 b''
1 # wireproto.py - generic wire protocol support functions
1 # wireproto.py - generic wire protocol support functions
2 #
2 #
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
3 # Copyright 2005-2010 Matt Mackall <mpm@selenic.com>
4 #
4 #
5 # This software may be used and distributed according to the terms of the
5 # This software may be used and distributed according to the terms of the
6 # GNU General Public License version 2 or any later version.
6 # GNU General Public License version 2 or any later version.
7
7
8 import urllib, tempfile, os, sys
8 import urllib, tempfile, os, sys
9 from i18n import _
9 from i18n import _
10 from node import bin, hex
10 from node import bin, hex
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
11 import changegroup as changegroupmod, bundle2, pushkey as pushkeymod
12 import peer, error, encoding, util, exchange
12 import peer, error, encoding, util, exchange
13
13
14
14
15 class abstractserverproto(object):
15 class abstractserverproto(object):
16 """abstract class that summarizes the protocol API
16 """abstract class that summarizes the protocol API
17
17
18 Used as reference and documentation.
18 Used as reference and documentation.
19 """
19 """
20
20
21 def getargs(self, args):
21 def getargs(self, args):
22 """return the value for arguments in <args>
22 """return the value for arguments in <args>
23
23
24 returns a list of values (same order as <args>)"""
24 returns a list of values (same order as <args>)"""
25 raise NotImplementedError()
25 raise NotImplementedError()
26
26
27 def getfile(self, fp):
27 def getfile(self, fp):
28 """write the whole content of a file into a file like object
28 """write the whole content of a file into a file like object
29
29
30 The file is in the form::
30 The file is in the form::
31
31
32 (<chunk-size>\n<chunk>)+0\n
32 (<chunk-size>\n<chunk>)+0\n
33
33
34 chunk size is the ascii version of the int.
34 chunk size is the ascii version of the int.
35 """
35 """
36 raise NotImplementedError()
36 raise NotImplementedError()
37
37
38 def redirect(self):
38 def redirect(self):
39 """may setup interception for stdout and stderr
39 """may setup interception for stdout and stderr
40
40
41 See also the `restore` method."""
41 See also the `restore` method."""
42 raise NotImplementedError()
42 raise NotImplementedError()
43
43
44 # If the `redirect` function does install interception, the `restore`
44 # If the `redirect` function does install interception, the `restore`
45 # function MUST be defined. If interception is not used, this function
45 # function MUST be defined. If interception is not used, this function
46 # MUST NOT be defined.
46 # MUST NOT be defined.
47 #
47 #
48 # left commented here on purpose
48 # left commented here on purpose
49 #
49 #
50 #def restore(self):
50 #def restore(self):
51 # """reinstall previous stdout and stderr and return intercepted stdout
51 # """reinstall previous stdout and stderr and return intercepted stdout
52 # """
52 # """
53 # raise NotImplementedError()
53 # raise NotImplementedError()
54
54
55 def groupchunks(self, cg):
55 def groupchunks(self, cg):
56 """return 4096 chunks from a changegroup object
56 """return 4096 chunks from a changegroup object
57
57
58 Some protocols may have compressed the contents."""
58 Some protocols may have compressed the contents."""
59 raise NotImplementedError()
59 raise NotImplementedError()
60
60
61 class remotebatch(peer.batcher):
61 class remotebatch(peer.batcher):
62 '''batches the queued calls; uses as few roundtrips as possible'''
62 '''batches the queued calls; uses as few roundtrips as possible'''
63 def __init__(self, remote):
63 def __init__(self, remote):
64 '''remote must support _submitbatch(encbatch) and
64 '''remote must support _submitbatch(encbatch) and
65 _submitone(op, encargs)'''
65 _submitone(op, encargs)'''
66 peer.batcher.__init__(self)
66 peer.batcher.__init__(self)
67 self.remote = remote
67 self.remote = remote
68 def submit(self):
68 def submit(self):
69 req, rsp = [], []
69 req, rsp = [], []
70 for name, args, opts, resref in self.calls:
70 for name, args, opts, resref in self.calls:
71 mtd = getattr(self.remote, name)
71 mtd = getattr(self.remote, name)
72 batchablefn = getattr(mtd, 'batchable', None)
72 batchablefn = getattr(mtd, 'batchable', None)
73 if batchablefn is not None:
73 if batchablefn is not None:
74 batchable = batchablefn(mtd.im_self, *args, **opts)
74 batchable = batchablefn(mtd.im_self, *args, **opts)
75 encargsorres, encresref = batchable.next()
75 encargsorres, encresref = batchable.next()
76 if encresref:
76 if encresref:
77 req.append((name, encargsorres,))
77 req.append((name, encargsorres,))
78 rsp.append((batchable, encresref, resref,))
78 rsp.append((batchable, encresref, resref,))
79 else:
79 else:
80 resref.set(encargsorres)
80 resref.set(encargsorres)
81 else:
81 else:
82 if req:
82 if req:
83 self._submitreq(req, rsp)
83 self._submitreq(req, rsp)
84 req, rsp = [], []
84 req, rsp = [], []
85 resref.set(mtd(*args, **opts))
85 resref.set(mtd(*args, **opts))
86 if req:
86 if req:
87 self._submitreq(req, rsp)
87 self._submitreq(req, rsp)
88 def _submitreq(self, req, rsp):
88 def _submitreq(self, req, rsp):
89 encresults = self.remote._submitbatch(req)
89 encresults = self.remote._submitbatch(req)
90 for encres, r in zip(encresults, rsp):
90 for encres, r in zip(encresults, rsp):
91 batchable, encresref, resref = r
91 batchable, encresref, resref = r
92 encresref.set(encres)
92 encresref.set(encres)
93 resref.set(batchable.next())
93 resref.set(batchable.next())
94
94
95 # Forward a couple of names from peer to make wireproto interactions
95 # Forward a couple of names from peer to make wireproto interactions
96 # slightly more sensible.
96 # slightly more sensible.
97 batchable = peer.batchable
97 batchable = peer.batchable
98 future = peer.future
98 future = peer.future
99
99
100 # list of nodes encoding / decoding
100 # list of nodes encoding / decoding
101
101
102 def decodelist(l, sep=' '):
102 def decodelist(l, sep=' '):
103 if l:
103 if l:
104 return map(bin, l.split(sep))
104 return map(bin, l.split(sep))
105 return []
105 return []
106
106
107 def encodelist(l, sep=' '):
107 def encodelist(l, sep=' '):
108 try:
108 try:
109 return sep.join(map(hex, l))
109 return sep.join(map(hex, l))
110 except TypeError:
110 except TypeError:
111 raise
111 raise
112
112
113 # batched call argument encoding
113 # batched call argument encoding
114
114
115 def escapearg(plain):
115 def escapearg(plain):
116 return (plain
116 return (plain
117 .replace(':', ':c')
117 .replace(':', ':c')
118 .replace(',', ':o')
118 .replace(',', ':o')
119 .replace(';', ':s')
119 .replace(';', ':s')
120 .replace('=', ':e'))
120 .replace('=', ':e'))
121
121
122 def unescapearg(escaped):
122 def unescapearg(escaped):
123 return (escaped
123 return (escaped
124 .replace(':e', '=')
124 .replace(':e', '=')
125 .replace(':s', ';')
125 .replace(':s', ';')
126 .replace(':o', ',')
126 .replace(':o', ',')
127 .replace(':c', ':'))
127 .replace(':c', ':'))
128
128
129 # mapping of options accepted by getbundle and their types
129 # mapping of options accepted by getbundle and their types
130 #
130 #
131 # Meant to be extended by extensions. It is extensions responsibility to ensure
131 # Meant to be extended by extensions. It is extensions responsibility to ensure
132 # such options are properly processed in exchange.getbundle.
132 # such options are properly processed in exchange.getbundle.
133 #
133 #
134 # supported types are:
134 # supported types are:
135 #
135 #
136 # :nodes: list of binary nodes
136 # :nodes: list of binary nodes
137 # :csv: list of comma-separated values
137 # :csv: list of comma-separated values
138 # :scsv: list of comma-separated values return as set
138 # :scsv: list of comma-separated values return as set
139 # :plain: string with no transformation needed.
139 # :plain: string with no transformation needed.
140 gboptsmap = {'heads': 'nodes',
140 gboptsmap = {'heads': 'nodes',
141 'common': 'nodes',
141 'common': 'nodes',
142 'obsmarkers': 'boolean',
142 'obsmarkers': 'boolean',
143 'bundlecaps': 'scsv',
143 'bundlecaps': 'scsv',
144 'listkeys': 'csv',
144 'listkeys': 'csv',
145 'cg': 'boolean'}
145 'cg': 'boolean'}
146
146
147 # client side
147 # client side
148
148
149 class wirepeer(peer.peerrepository):
149 class wirepeer(peer.peerrepository):
150
150
151 def batch(self):
151 def batch(self):
152 if self.capable('batch'):
152 return remotebatch(self)
153 return remotebatch(self)
154 else:
155 return peer.localbatch(self)
153 def _submitbatch(self, req):
156 def _submitbatch(self, req):
154 cmds = []
157 cmds = []
155 for op, argsdict in req:
158 for op, argsdict in req:
156 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
159 args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
157 for k, v in argsdict.iteritems())
160 for k, v in argsdict.iteritems())
158 cmds.append('%s %s' % (op, args))
161 cmds.append('%s %s' % (op, args))
159 rsp = self._call("batch", cmds=';'.join(cmds))
162 rsp = self._call("batch", cmds=';'.join(cmds))
160 return [unescapearg(r) for r in rsp.split(';')]
163 return [unescapearg(r) for r in rsp.split(';')]
161 def _submitone(self, op, args):
164 def _submitone(self, op, args):
162 return self._call(op, **args)
165 return self._call(op, **args)
163
166
164 @batchable
167 @batchable
165 def lookup(self, key):
168 def lookup(self, key):
166 self.requirecap('lookup', _('look up remote revision'))
169 self.requirecap('lookup', _('look up remote revision'))
167 f = future()
170 f = future()
168 yield {'key': encoding.fromlocal(key)}, f
171 yield {'key': encoding.fromlocal(key)}, f
169 d = f.value
172 d = f.value
170 success, data = d[:-1].split(" ", 1)
173 success, data = d[:-1].split(" ", 1)
171 if int(success):
174 if int(success):
172 yield bin(data)
175 yield bin(data)
173 self._abort(error.RepoError(data))
176 self._abort(error.RepoError(data))
174
177
175 @batchable
178 @batchable
176 def heads(self):
179 def heads(self):
177 f = future()
180 f = future()
178 yield {}, f
181 yield {}, f
179 d = f.value
182 d = f.value
180 try:
183 try:
181 yield decodelist(d[:-1])
184 yield decodelist(d[:-1])
182 except ValueError:
185 except ValueError:
183 self._abort(error.ResponseError(_("unexpected response:"), d))
186 self._abort(error.ResponseError(_("unexpected response:"), d))
184
187
185 @batchable
188 @batchable
186 def known(self, nodes):
189 def known(self, nodes):
187 f = future()
190 f = future()
188 yield {'nodes': encodelist(nodes)}, f
191 yield {'nodes': encodelist(nodes)}, f
189 d = f.value
192 d = f.value
190 try:
193 try:
191 yield [bool(int(b)) for b in d]
194 yield [bool(int(b)) for b in d]
192 except ValueError:
195 except ValueError:
193 self._abort(error.ResponseError(_("unexpected response:"), d))
196 self._abort(error.ResponseError(_("unexpected response:"), d))
194
197
195 @batchable
198 @batchable
196 def branchmap(self):
199 def branchmap(self):
197 f = future()
200 f = future()
198 yield {}, f
201 yield {}, f
199 d = f.value
202 d = f.value
200 try:
203 try:
201 branchmap = {}
204 branchmap = {}
202 for branchpart in d.splitlines():
205 for branchpart in d.splitlines():
203 branchname, branchheads = branchpart.split(' ', 1)
206 branchname, branchheads = branchpart.split(' ', 1)
204 branchname = encoding.tolocal(urllib.unquote(branchname))
207 branchname = encoding.tolocal(urllib.unquote(branchname))
205 branchheads = decodelist(branchheads)
208 branchheads = decodelist(branchheads)
206 branchmap[branchname] = branchheads
209 branchmap[branchname] = branchheads
207 yield branchmap
210 yield branchmap
208 except TypeError:
211 except TypeError:
209 self._abort(error.ResponseError(_("unexpected response:"), d))
212 self._abort(error.ResponseError(_("unexpected response:"), d))
210
213
211 def branches(self, nodes):
214 def branches(self, nodes):
212 n = encodelist(nodes)
215 n = encodelist(nodes)
213 d = self._call("branches", nodes=n)
216 d = self._call("branches", nodes=n)
214 try:
217 try:
215 br = [tuple(decodelist(b)) for b in d.splitlines()]
218 br = [tuple(decodelist(b)) for b in d.splitlines()]
216 return br
219 return br
217 except ValueError:
220 except ValueError:
218 self._abort(error.ResponseError(_("unexpected response:"), d))
221 self._abort(error.ResponseError(_("unexpected response:"), d))
219
222
220 def between(self, pairs):
223 def between(self, pairs):
221 batch = 8 # avoid giant requests
224 batch = 8 # avoid giant requests
222 r = []
225 r = []
223 for i in xrange(0, len(pairs), batch):
226 for i in xrange(0, len(pairs), batch):
224 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
227 n = " ".join([encodelist(p, '-') for p in pairs[i:i + batch]])
225 d = self._call("between", pairs=n)
228 d = self._call("between", pairs=n)
226 try:
229 try:
227 r.extend(l and decodelist(l) or [] for l in d.splitlines())
230 r.extend(l and decodelist(l) or [] for l in d.splitlines())
228 except ValueError:
231 except ValueError:
229 self._abort(error.ResponseError(_("unexpected response:"), d))
232 self._abort(error.ResponseError(_("unexpected response:"), d))
230 return r
233 return r
231
234
232 @batchable
235 @batchable
233 def pushkey(self, namespace, key, old, new):
236 def pushkey(self, namespace, key, old, new):
234 if not self.capable('pushkey'):
237 if not self.capable('pushkey'):
235 yield False, None
238 yield False, None
236 f = future()
239 f = future()
237 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
240 self.ui.debug('preparing pushkey for "%s:%s"\n' % (namespace, key))
238 yield {'namespace': encoding.fromlocal(namespace),
241 yield {'namespace': encoding.fromlocal(namespace),
239 'key': encoding.fromlocal(key),
242 'key': encoding.fromlocal(key),
240 'old': encoding.fromlocal(old),
243 'old': encoding.fromlocal(old),
241 'new': encoding.fromlocal(new)}, f
244 'new': encoding.fromlocal(new)}, f
242 d = f.value
245 d = f.value
243 d, output = d.split('\n', 1)
246 d, output = d.split('\n', 1)
244 try:
247 try:
245 d = bool(int(d))
248 d = bool(int(d))
246 except ValueError:
249 except ValueError:
247 raise error.ResponseError(
250 raise error.ResponseError(
248 _('push failed (unexpected response):'), d)
251 _('push failed (unexpected response):'), d)
249 for l in output.splitlines(True):
252 for l in output.splitlines(True):
250 self.ui.status(_('remote: '), l)
253 self.ui.status(_('remote: '), l)
251 yield d
254 yield d
252
255
253 @batchable
256 @batchable
254 def listkeys(self, namespace):
257 def listkeys(self, namespace):
255 if not self.capable('pushkey'):
258 if not self.capable('pushkey'):
256 yield {}, None
259 yield {}, None
257 f = future()
260 f = future()
258 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
261 self.ui.debug('preparing listkeys for "%s"\n' % namespace)
259 yield {'namespace': encoding.fromlocal(namespace)}, f
262 yield {'namespace': encoding.fromlocal(namespace)}, f
260 d = f.value
263 d = f.value
261 self.ui.debug('received listkey for "%s": %i bytes\n'
264 self.ui.debug('received listkey for "%s": %i bytes\n'
262 % (namespace, len(d)))
265 % (namespace, len(d)))
263 yield pushkeymod.decodekeys(d)
266 yield pushkeymod.decodekeys(d)
264
267
265 def stream_out(self):
268 def stream_out(self):
266 return self._callstream('stream_out')
269 return self._callstream('stream_out')
267
270
268 def changegroup(self, nodes, kind):
271 def changegroup(self, nodes, kind):
269 n = encodelist(nodes)
272 n = encodelist(nodes)
270 f = self._callcompressable("changegroup", roots=n)
273 f = self._callcompressable("changegroup", roots=n)
271 return changegroupmod.cg1unpacker(f, 'UN')
274 return changegroupmod.cg1unpacker(f, 'UN')
272
275
273 def changegroupsubset(self, bases, heads, kind):
276 def changegroupsubset(self, bases, heads, kind):
274 self.requirecap('changegroupsubset', _('look up remote changes'))
277 self.requirecap('changegroupsubset', _('look up remote changes'))
275 bases = encodelist(bases)
278 bases = encodelist(bases)
276 heads = encodelist(heads)
279 heads = encodelist(heads)
277 f = self._callcompressable("changegroupsubset",
280 f = self._callcompressable("changegroupsubset",
278 bases=bases, heads=heads)
281 bases=bases, heads=heads)
279 return changegroupmod.cg1unpacker(f, 'UN')
282 return changegroupmod.cg1unpacker(f, 'UN')
280
283
281 def getbundle(self, source, **kwargs):
284 def getbundle(self, source, **kwargs):
282 self.requirecap('getbundle', _('look up remote changes'))
285 self.requirecap('getbundle', _('look up remote changes'))
283 opts = {}
286 opts = {}
284 bundlecaps = kwargs.get('bundlecaps')
287 bundlecaps = kwargs.get('bundlecaps')
285 if bundlecaps is not None:
288 if bundlecaps is not None:
286 kwargs['bundlecaps'] = sorted(bundlecaps)
289 kwargs['bundlecaps'] = sorted(bundlecaps)
287 else:
290 else:
288 bundlecaps = () # kwargs could have it to None
291 bundlecaps = () # kwargs could have it to None
289 for key, value in kwargs.iteritems():
292 for key, value in kwargs.iteritems():
290 if value is None:
293 if value is None:
291 continue
294 continue
292 keytype = gboptsmap.get(key)
295 keytype = gboptsmap.get(key)
293 if keytype is None:
296 if keytype is None:
294 assert False, 'unexpected'
297 assert False, 'unexpected'
295 elif keytype == 'nodes':
298 elif keytype == 'nodes':
296 value = encodelist(value)
299 value = encodelist(value)
297 elif keytype in ('csv', 'scsv'):
300 elif keytype in ('csv', 'scsv'):
298 value = ','.join(value)
301 value = ','.join(value)
299 elif keytype == 'boolean':
302 elif keytype == 'boolean':
300 value = '%i' % bool(value)
303 value = '%i' % bool(value)
301 elif keytype != 'plain':
304 elif keytype != 'plain':
302 raise KeyError('unknown getbundle option type %s'
305 raise KeyError('unknown getbundle option type %s'
303 % keytype)
306 % keytype)
304 opts[key] = value
307 opts[key] = value
305 f = self._callcompressable("getbundle", **opts)
308 f = self._callcompressable("getbundle", **opts)
306 if any((cap.startswith('HG2') for cap in bundlecaps)):
309 if any((cap.startswith('HG2') for cap in bundlecaps)):
307 return bundle2.getunbundler(self.ui, f)
310 return bundle2.getunbundler(self.ui, f)
308 else:
311 else:
309 return changegroupmod.cg1unpacker(f, 'UN')
312 return changegroupmod.cg1unpacker(f, 'UN')
310
313
311 def unbundle(self, cg, heads, source):
314 def unbundle(self, cg, heads, source):
312 '''Send cg (a readable file-like object representing the
315 '''Send cg (a readable file-like object representing the
313 changegroup to push, typically a chunkbuffer object) to the
316 changegroup to push, typically a chunkbuffer object) to the
314 remote server as a bundle.
317 remote server as a bundle.
315
318
316 When pushing a bundle10 stream, return an integer indicating the
319 When pushing a bundle10 stream, return an integer indicating the
317 result of the push (see localrepository.addchangegroup()).
320 result of the push (see localrepository.addchangegroup()).
318
321
319 When pushing a bundle20 stream, return a bundle20 stream.'''
322 When pushing a bundle20 stream, return a bundle20 stream.'''
320
323
321 if heads != ['force'] and self.capable('unbundlehash'):
324 if heads != ['force'] and self.capable('unbundlehash'):
322 heads = encodelist(['hashed',
325 heads = encodelist(['hashed',
323 util.sha1(''.join(sorted(heads))).digest()])
326 util.sha1(''.join(sorted(heads))).digest()])
324 else:
327 else:
325 heads = encodelist(heads)
328 heads = encodelist(heads)
326
329
327 if util.safehasattr(cg, 'deltaheader'):
330 if util.safehasattr(cg, 'deltaheader'):
328 # this a bundle10, do the old style call sequence
331 # this a bundle10, do the old style call sequence
329 ret, output = self._callpush("unbundle", cg, heads=heads)
332 ret, output = self._callpush("unbundle", cg, heads=heads)
330 if ret == "":
333 if ret == "":
331 raise error.ResponseError(
334 raise error.ResponseError(
332 _('push failed:'), output)
335 _('push failed:'), output)
333 try:
336 try:
334 ret = int(ret)
337 ret = int(ret)
335 except ValueError:
338 except ValueError:
336 raise error.ResponseError(
339 raise error.ResponseError(
337 _('push failed (unexpected response):'), ret)
340 _('push failed (unexpected response):'), ret)
338
341
339 for l in output.splitlines(True):
342 for l in output.splitlines(True):
340 self.ui.status(_('remote: '), l)
343 self.ui.status(_('remote: '), l)
341 else:
344 else:
342 # bundle2 push. Send a stream, fetch a stream.
345 # bundle2 push. Send a stream, fetch a stream.
343 stream = self._calltwowaystream('unbundle', cg, heads=heads)
346 stream = self._calltwowaystream('unbundle', cg, heads=heads)
344 ret = bundle2.getunbundler(self.ui, stream)
347 ret = bundle2.getunbundler(self.ui, stream)
345 return ret
348 return ret
346
349
347 def debugwireargs(self, one, two, three=None, four=None, five=None):
350 def debugwireargs(self, one, two, three=None, four=None, five=None):
348 # don't pass optional arguments left at their default value
351 # don't pass optional arguments left at their default value
349 opts = {}
352 opts = {}
350 if three is not None:
353 if three is not None:
351 opts['three'] = three
354 opts['three'] = three
352 if four is not None:
355 if four is not None:
353 opts['four'] = four
356 opts['four'] = four
354 return self._call('debugwireargs', one=one, two=two, **opts)
357 return self._call('debugwireargs', one=one, two=two, **opts)
355
358
356 def _call(self, cmd, **args):
359 def _call(self, cmd, **args):
357 """execute <cmd> on the server
360 """execute <cmd> on the server
358
361
359 The command is expected to return a simple string.
362 The command is expected to return a simple string.
360
363
361 returns the server reply as a string."""
364 returns the server reply as a string."""
362 raise NotImplementedError()
365 raise NotImplementedError()
363
366
364 def _callstream(self, cmd, **args):
367 def _callstream(self, cmd, **args):
365 """execute <cmd> on the server
368 """execute <cmd> on the server
366
369
367 The command is expected to return a stream.
370 The command is expected to return a stream.
368
371
369 returns the server reply as a file like object."""
372 returns the server reply as a file like object."""
370 raise NotImplementedError()
373 raise NotImplementedError()
371
374
372 def _callcompressable(self, cmd, **args):
375 def _callcompressable(self, cmd, **args):
373 """execute <cmd> on the server
376 """execute <cmd> on the server
374
377
375 The command is expected to return a stream.
378 The command is expected to return a stream.
376
379
377 The stream may have been compressed in some implementations. This
380 The stream may have been compressed in some implementations. This
378 function takes care of the decompression. This is the only difference
381 function takes care of the decompression. This is the only difference
379 with _callstream.
382 with _callstream.
380
383
381 returns the server reply as a file like object.
384 returns the server reply as a file like object.
382 """
385 """
383 raise NotImplementedError()
386 raise NotImplementedError()
384
387
385 def _callpush(self, cmd, fp, **args):
388 def _callpush(self, cmd, fp, **args):
386 """execute a <cmd> on server
389 """execute a <cmd> on server
387
390
388 The command is expected to be related to a push. Push has a special
391 The command is expected to be related to a push. Push has a special
389 return method.
392 return method.
390
393
391 returns the server reply as a (ret, output) tuple. ret is either
394 returns the server reply as a (ret, output) tuple. ret is either
392 empty (error) or a stringified int.
395 empty (error) or a stringified int.
393 """
396 """
394 raise NotImplementedError()
397 raise NotImplementedError()
395
398
396 def _calltwowaystream(self, cmd, fp, **args):
399 def _calltwowaystream(self, cmd, fp, **args):
397 """execute <cmd> on server
400 """execute <cmd> on server
398
401
399 The command will send a stream to the server and get a stream in reply.
402 The command will send a stream to the server and get a stream in reply.
400 """
403 """
401 raise NotImplementedError()
404 raise NotImplementedError()
402
405
403 def _abort(self, exception):
406 def _abort(self, exception):
404 """clearly abort the wire protocol connection and raise the exception
407 """clearly abort the wire protocol connection and raise the exception
405 """
408 """
406 raise NotImplementedError()
409 raise NotImplementedError()
407
410
408 # server side
411 # server side
409
412
410 # wire protocol command can either return a string or one of these classes.
413 # wire protocol command can either return a string or one of these classes.
411 class streamres(object):
414 class streamres(object):
412 """wireproto reply: binary stream
415 """wireproto reply: binary stream
413
416
414 The call was successful and the result is a stream.
417 The call was successful and the result is a stream.
415 Iterate on the `self.gen` attribute to retrieve chunks.
418 Iterate on the `self.gen` attribute to retrieve chunks.
416 """
419 """
417 def __init__(self, gen):
420 def __init__(self, gen):
418 self.gen = gen
421 self.gen = gen
419
422
420 class pushres(object):
423 class pushres(object):
421 """wireproto reply: success with simple integer return
424 """wireproto reply: success with simple integer return
422
425
423 The call was successful and returned an integer contained in `self.res`.
426 The call was successful and returned an integer contained in `self.res`.
424 """
427 """
425 def __init__(self, res):
428 def __init__(self, res):
426 self.res = res
429 self.res = res
427
430
428 class pusherr(object):
431 class pusherr(object):
429 """wireproto reply: failure
432 """wireproto reply: failure
430
433
431 The call failed. The `self.res` attribute contains the error message.
434 The call failed. The `self.res` attribute contains the error message.
432 """
435 """
433 def __init__(self, res):
436 def __init__(self, res):
434 self.res = res
437 self.res = res
435
438
436 class ooberror(object):
439 class ooberror(object):
437 """wireproto reply: failure of a batch of operation
440 """wireproto reply: failure of a batch of operation
438
441
439 Something failed during a batch call. The error message is stored in
442 Something failed during a batch call. The error message is stored in
440 `self.message`.
443 `self.message`.
441 """
444 """
442 def __init__(self, message):
445 def __init__(self, message):
443 self.message = message
446 self.message = message
444
447
445 def dispatch(repo, proto, command):
448 def dispatch(repo, proto, command):
446 repo = repo.filtered("served")
449 repo = repo.filtered("served")
447 func, spec = commands[command]
450 func, spec = commands[command]
448 args = proto.getargs(spec)
451 args = proto.getargs(spec)
449 return func(repo, proto, *args)
452 return func(repo, proto, *args)
450
453
451 def options(cmd, keys, others):
454 def options(cmd, keys, others):
452 opts = {}
455 opts = {}
453 for k in keys:
456 for k in keys:
454 if k in others:
457 if k in others:
455 opts[k] = others[k]
458 opts[k] = others[k]
456 del others[k]
459 del others[k]
457 if others:
460 if others:
458 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
461 sys.stderr.write("warning: %s ignored unexpected arguments %s\n"
459 % (cmd, ",".join(others)))
462 % (cmd, ",".join(others)))
460 return opts
463 return opts
461
464
462 # list of commands
465 # list of commands
463 commands = {}
466 commands = {}
464
467
465 def wireprotocommand(name, args=''):
468 def wireprotocommand(name, args=''):
466 """decorator for wire protocol command"""
469 """decorator for wire protocol command"""
467 def register(func):
470 def register(func):
468 commands[name] = (func, args)
471 commands[name] = (func, args)
469 return func
472 return func
470 return register
473 return register
471
474
472 @wireprotocommand('batch', 'cmds *')
475 @wireprotocommand('batch', 'cmds *')
473 def batch(repo, proto, cmds, others):
476 def batch(repo, proto, cmds, others):
474 repo = repo.filtered("served")
477 repo = repo.filtered("served")
475 res = []
478 res = []
476 for pair in cmds.split(';'):
479 for pair in cmds.split(';'):
477 op, args = pair.split(' ', 1)
480 op, args = pair.split(' ', 1)
478 vals = {}
481 vals = {}
479 for a in args.split(','):
482 for a in args.split(','):
480 if a:
483 if a:
481 n, v = a.split('=')
484 n, v = a.split('=')
482 vals[n] = unescapearg(v)
485 vals[n] = unescapearg(v)
483 func, spec = commands[op]
486 func, spec = commands[op]
484 if spec:
487 if spec:
485 keys = spec.split()
488 keys = spec.split()
486 data = {}
489 data = {}
487 for k in keys:
490 for k in keys:
488 if k == '*':
491 if k == '*':
489 star = {}
492 star = {}
490 for key in vals.keys():
493 for key in vals.keys():
491 if key not in keys:
494 if key not in keys:
492 star[key] = vals[key]
495 star[key] = vals[key]
493 data['*'] = star
496 data['*'] = star
494 else:
497 else:
495 data[k] = vals[k]
498 data[k] = vals[k]
496 result = func(repo, proto, *[data[k] for k in keys])
499 result = func(repo, proto, *[data[k] for k in keys])
497 else:
500 else:
498 result = func(repo, proto)
501 result = func(repo, proto)
499 if isinstance(result, ooberror):
502 if isinstance(result, ooberror):
500 return result
503 return result
501 res.append(escapearg(result))
504 res.append(escapearg(result))
502 return ';'.join(res)
505 return ';'.join(res)
503
506
504 @wireprotocommand('between', 'pairs')
507 @wireprotocommand('between', 'pairs')
505 def between(repo, proto, pairs):
508 def between(repo, proto, pairs):
506 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
509 pairs = [decodelist(p, '-') for p in pairs.split(" ")]
507 r = []
510 r = []
508 for b in repo.between(pairs):
511 for b in repo.between(pairs):
509 r.append(encodelist(b) + "\n")
512 r.append(encodelist(b) + "\n")
510 return "".join(r)
513 return "".join(r)
511
514
512 @wireprotocommand('branchmap')
515 @wireprotocommand('branchmap')
513 def branchmap(repo, proto):
516 def branchmap(repo, proto):
514 branchmap = repo.branchmap()
517 branchmap = repo.branchmap()
515 heads = []
518 heads = []
516 for branch, nodes in branchmap.iteritems():
519 for branch, nodes in branchmap.iteritems():
517 branchname = urllib.quote(encoding.fromlocal(branch))
520 branchname = urllib.quote(encoding.fromlocal(branch))
518 branchnodes = encodelist(nodes)
521 branchnodes = encodelist(nodes)
519 heads.append('%s %s' % (branchname, branchnodes))
522 heads.append('%s %s' % (branchname, branchnodes))
520 return '\n'.join(heads)
523 return '\n'.join(heads)
521
524
522 @wireprotocommand('branches', 'nodes')
525 @wireprotocommand('branches', 'nodes')
523 def branches(repo, proto, nodes):
526 def branches(repo, proto, nodes):
524 nodes = decodelist(nodes)
527 nodes = decodelist(nodes)
525 r = []
528 r = []
526 for b in repo.branches(nodes):
529 for b in repo.branches(nodes):
527 r.append(encodelist(b) + "\n")
530 r.append(encodelist(b) + "\n")
528 return "".join(r)
531 return "".join(r)
529
532
530
533
531 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
534 wireprotocaps = ['lookup', 'changegroupsubset', 'branchmap', 'pushkey',
532 'known', 'getbundle', 'unbundlehash', 'batch']
535 'known', 'getbundle', 'unbundlehash', 'batch']
533
536
534 def _capabilities(repo, proto):
537 def _capabilities(repo, proto):
535 """return a list of capabilities for a repo
538 """return a list of capabilities for a repo
536
539
537 This function exists to allow extensions to easily wrap capabilities
540 This function exists to allow extensions to easily wrap capabilities
538 computation
541 computation
539
542
540 - returns a lists: easy to alter
543 - returns a lists: easy to alter
541 - change done here will be propagated to both `capabilities` and `hello`
544 - change done here will be propagated to both `capabilities` and `hello`
542 command without any other action needed.
545 command without any other action needed.
543 """
546 """
544 # copy to prevent modification of the global list
547 # copy to prevent modification of the global list
545 caps = list(wireprotocaps)
548 caps = list(wireprotocaps)
546 if _allowstream(repo.ui):
549 if _allowstream(repo.ui):
547 if repo.ui.configbool('server', 'preferuncompressed', False):
550 if repo.ui.configbool('server', 'preferuncompressed', False):
548 caps.append('stream-preferred')
551 caps.append('stream-preferred')
549 requiredformats = repo.requirements & repo.supportedformats
552 requiredformats = repo.requirements & repo.supportedformats
550 # if our local revlogs are just revlogv1, add 'stream' cap
553 # if our local revlogs are just revlogv1, add 'stream' cap
551 if not requiredformats - set(('revlogv1',)):
554 if not requiredformats - set(('revlogv1',)):
552 caps.append('stream')
555 caps.append('stream')
553 # otherwise, add 'streamreqs' detailing our local revlog format
556 # otherwise, add 'streamreqs' detailing our local revlog format
554 else:
557 else:
555 caps.append('streamreqs=%s' % ','.join(requiredformats))
558 caps.append('streamreqs=%s' % ','.join(requiredformats))
556 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
559 if repo.ui.configbool('experimental', 'bundle2-advertise', True):
557 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
560 capsblob = bundle2.encodecaps(bundle2.getrepocaps(repo))
558 caps.append('bundle2=' + urllib.quote(capsblob))
561 caps.append('bundle2=' + urllib.quote(capsblob))
559 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
562 caps.append('unbundle=%s' % ','.join(changegroupmod.bundlepriority))
560 caps.append(
563 caps.append(
561 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
564 'httpheader=%d' % repo.ui.configint('server', 'maxhttpheaderlen', 1024))
562 return caps
565 return caps
563
566
564 # If you are writing an extension and consider wrapping this function. Wrap
567 # If you are writing an extension and consider wrapping this function. Wrap
565 # `_capabilities` instead.
568 # `_capabilities` instead.
566 @wireprotocommand('capabilities')
569 @wireprotocommand('capabilities')
567 def capabilities(repo, proto):
570 def capabilities(repo, proto):
568 return ' '.join(_capabilities(repo, proto))
571 return ' '.join(_capabilities(repo, proto))
569
572
570 @wireprotocommand('changegroup', 'roots')
573 @wireprotocommand('changegroup', 'roots')
571 def changegroup(repo, proto, roots):
574 def changegroup(repo, proto, roots):
572 nodes = decodelist(roots)
575 nodes = decodelist(roots)
573 cg = changegroupmod.changegroup(repo, nodes, 'serve')
576 cg = changegroupmod.changegroup(repo, nodes, 'serve')
574 return streamres(proto.groupchunks(cg))
577 return streamres(proto.groupchunks(cg))
575
578
576 @wireprotocommand('changegroupsubset', 'bases heads')
579 @wireprotocommand('changegroupsubset', 'bases heads')
577 def changegroupsubset(repo, proto, bases, heads):
580 def changegroupsubset(repo, proto, bases, heads):
578 bases = decodelist(bases)
581 bases = decodelist(bases)
579 heads = decodelist(heads)
582 heads = decodelist(heads)
580 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
583 cg = changegroupmod.changegroupsubset(repo, bases, heads, 'serve')
581 return streamres(proto.groupchunks(cg))
584 return streamres(proto.groupchunks(cg))
582
585
583 @wireprotocommand('debugwireargs', 'one two *')
586 @wireprotocommand('debugwireargs', 'one two *')
584 def debugwireargs(repo, proto, one, two, others):
587 def debugwireargs(repo, proto, one, two, others):
585 # only accept optional args from the known set
588 # only accept optional args from the known set
586 opts = options('debugwireargs', ['three', 'four'], others)
589 opts = options('debugwireargs', ['three', 'four'], others)
587 return repo.debugwireargs(one, two, **opts)
590 return repo.debugwireargs(one, two, **opts)
588
591
589 # List of options accepted by getbundle.
592 # List of options accepted by getbundle.
590 #
593 #
591 # Meant to be extended by extensions. It is the extension's responsibility to
594 # Meant to be extended by extensions. It is the extension's responsibility to
592 # ensure such options are properly processed in exchange.getbundle.
595 # ensure such options are properly processed in exchange.getbundle.
593 gboptslist = ['heads', 'common', 'bundlecaps']
596 gboptslist = ['heads', 'common', 'bundlecaps']
594
597
595 @wireprotocommand('getbundle', '*')
598 @wireprotocommand('getbundle', '*')
596 def getbundle(repo, proto, others):
599 def getbundle(repo, proto, others):
597 opts = options('getbundle', gboptsmap.keys(), others)
600 opts = options('getbundle', gboptsmap.keys(), others)
598 for k, v in opts.iteritems():
601 for k, v in opts.iteritems():
599 keytype = gboptsmap[k]
602 keytype = gboptsmap[k]
600 if keytype == 'nodes':
603 if keytype == 'nodes':
601 opts[k] = decodelist(v)
604 opts[k] = decodelist(v)
602 elif keytype == 'csv':
605 elif keytype == 'csv':
603 opts[k] = list(v.split(','))
606 opts[k] = list(v.split(','))
604 elif keytype == 'scsv':
607 elif keytype == 'scsv':
605 opts[k] = set(v.split(','))
608 opts[k] = set(v.split(','))
606 elif keytype == 'boolean':
609 elif keytype == 'boolean':
607 opts[k] = bool(v)
610 opts[k] = bool(v)
608 elif keytype != 'plain':
611 elif keytype != 'plain':
609 raise KeyError('unknown getbundle option type %s'
612 raise KeyError('unknown getbundle option type %s'
610 % keytype)
613 % keytype)
611 cg = exchange.getbundle(repo, 'serve', **opts)
614 cg = exchange.getbundle(repo, 'serve', **opts)
612 return streamres(proto.groupchunks(cg))
615 return streamres(proto.groupchunks(cg))
613
616
614 @wireprotocommand('heads')
617 @wireprotocommand('heads')
615 def heads(repo, proto):
618 def heads(repo, proto):
616 h = repo.heads()
619 h = repo.heads()
617 return encodelist(h) + "\n"
620 return encodelist(h) + "\n"
618
621
619 @wireprotocommand('hello')
622 @wireprotocommand('hello')
620 def hello(repo, proto):
623 def hello(repo, proto):
621 '''the hello command returns a set of lines describing various
624 '''the hello command returns a set of lines describing various
622 interesting things about the server, in an RFC822-like format.
625 interesting things about the server, in an RFC822-like format.
623 Currently the only one defined is "capabilities", which
626 Currently the only one defined is "capabilities", which
624 consists of a line in the form:
627 consists of a line in the form:
625
628
626 capabilities: space separated list of tokens
629 capabilities: space separated list of tokens
627 '''
630 '''
628 return "capabilities: %s\n" % (capabilities(repo, proto))
631 return "capabilities: %s\n" % (capabilities(repo, proto))
629
632
630 @wireprotocommand('listkeys', 'namespace')
633 @wireprotocommand('listkeys', 'namespace')
631 def listkeys(repo, proto, namespace):
634 def listkeys(repo, proto, namespace):
632 d = repo.listkeys(encoding.tolocal(namespace)).items()
635 d = repo.listkeys(encoding.tolocal(namespace)).items()
633 return pushkeymod.encodekeys(d)
636 return pushkeymod.encodekeys(d)
634
637
635 @wireprotocommand('lookup', 'key')
638 @wireprotocommand('lookup', 'key')
636 def lookup(repo, proto, key):
639 def lookup(repo, proto, key):
637 try:
640 try:
638 k = encoding.tolocal(key)
641 k = encoding.tolocal(key)
639 c = repo[k]
642 c = repo[k]
640 r = c.hex()
643 r = c.hex()
641 success = 1
644 success = 1
642 except Exception as inst:
645 except Exception as inst:
643 r = str(inst)
646 r = str(inst)
644 success = 0
647 success = 0
645 return "%s %s\n" % (success, r)
648 return "%s %s\n" % (success, r)
646
649
647 @wireprotocommand('known', 'nodes *')
650 @wireprotocommand('known', 'nodes *')
648 def known(repo, proto, nodes, others):
651 def known(repo, proto, nodes, others):
649 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
652 return ''.join(b and "1" or "0" for b in repo.known(decodelist(nodes)))
650
653
651 @wireprotocommand('pushkey', 'namespace key old new')
654 @wireprotocommand('pushkey', 'namespace key old new')
652 def pushkey(repo, proto, namespace, key, old, new):
655 def pushkey(repo, proto, namespace, key, old, new):
653 # compatibility with pre-1.8 clients which were accidentally
656 # compatibility with pre-1.8 clients which were accidentally
654 # sending raw binary nodes rather than utf-8-encoded hex
657 # sending raw binary nodes rather than utf-8-encoded hex
655 if len(new) == 20 and new.encode('string-escape') != new:
658 if len(new) == 20 and new.encode('string-escape') != new:
656 # looks like it could be a binary node
659 # looks like it could be a binary node
657 try:
660 try:
658 new.decode('utf-8')
661 new.decode('utf-8')
659 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
662 new = encoding.tolocal(new) # but cleanly decodes as UTF-8
660 except UnicodeDecodeError:
663 except UnicodeDecodeError:
661 pass # binary, leave unmodified
664 pass # binary, leave unmodified
662 else:
665 else:
663 new = encoding.tolocal(new) # normal path
666 new = encoding.tolocal(new) # normal path
664
667
665 if util.safehasattr(proto, 'restore'):
668 if util.safehasattr(proto, 'restore'):
666
669
667 proto.redirect()
670 proto.redirect()
668
671
669 try:
672 try:
670 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
673 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
671 encoding.tolocal(old), new) or False
674 encoding.tolocal(old), new) or False
672 except util.Abort:
675 except util.Abort:
673 r = False
676 r = False
674
677
675 output = proto.restore()
678 output = proto.restore()
676
679
677 return '%s\n%s' % (int(r), output)
680 return '%s\n%s' % (int(r), output)
678
681
679 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
682 r = repo.pushkey(encoding.tolocal(namespace), encoding.tolocal(key),
680 encoding.tolocal(old), new)
683 encoding.tolocal(old), new)
681 return '%s\n' % int(r)
684 return '%s\n' % int(r)
682
685
683 def _allowstream(ui):
686 def _allowstream(ui):
684 return ui.configbool('server', 'uncompressed', True, untrusted=True)
687 return ui.configbool('server', 'uncompressed', True, untrusted=True)
685
688
686 @wireprotocommand('stream_out')
689 @wireprotocommand('stream_out')
687 def stream(repo, proto):
690 def stream(repo, proto):
688 '''If the server supports streaming clone, it advertises the "stream"
691 '''If the server supports streaming clone, it advertises the "stream"
689 capability with a value representing the version and flags of the repo
692 capability with a value representing the version and flags of the repo
690 it is serving. Client checks to see if it understands the format.
693 it is serving. Client checks to see if it understands the format.
691 '''
694 '''
692 if not _allowstream(repo.ui):
695 if not _allowstream(repo.ui):
693 return '1\n'
696 return '1\n'
694
697
695 def getstream(it):
698 def getstream(it):
696 yield '0\n'
699 yield '0\n'
697 for chunk in it:
700 for chunk in it:
698 yield chunk
701 yield chunk
699
702
700 try:
703 try:
701 # LockError may be raised before the first result is yielded. Don't
704 # LockError may be raised before the first result is yielded. Don't
702 # emit output until we're sure we got the lock successfully.
705 # emit output until we're sure we got the lock successfully.
703 it = exchange.generatestreamclone(repo)
706 it = exchange.generatestreamclone(repo)
704 return streamres(getstream(it))
707 return streamres(getstream(it))
705 except error.LockError:
708 except error.LockError:
706 return '2\n'
709 return '2\n'
707
710
708 @wireprotocommand('unbundle', 'heads')
711 @wireprotocommand('unbundle', 'heads')
709 def unbundle(repo, proto, heads):
712 def unbundle(repo, proto, heads):
710 their_heads = decodelist(heads)
713 their_heads = decodelist(heads)
711
714
712 try:
715 try:
713 proto.redirect()
716 proto.redirect()
714
717
715 exchange.check_heads(repo, their_heads, 'preparing changes')
718 exchange.check_heads(repo, their_heads, 'preparing changes')
716
719
717 # write bundle data to temporary file because it can be big
720 # write bundle data to temporary file because it can be big
718 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
721 fd, tempname = tempfile.mkstemp(prefix='hg-unbundle-')
719 fp = os.fdopen(fd, 'wb+')
722 fp = os.fdopen(fd, 'wb+')
720 r = 0
723 r = 0
721 try:
724 try:
722 proto.getfile(fp)
725 proto.getfile(fp)
723 fp.seek(0)
726 fp.seek(0)
724 gen = exchange.readbundle(repo.ui, fp, None)
727 gen = exchange.readbundle(repo.ui, fp, None)
725 r = exchange.unbundle(repo, gen, their_heads, 'serve',
728 r = exchange.unbundle(repo, gen, their_heads, 'serve',
726 proto._client())
729 proto._client())
727 if util.safehasattr(r, 'addpart'):
730 if util.safehasattr(r, 'addpart'):
728 # The return looks streamable, we are in the bundle2 case and
731 # The return looks streamable, we are in the bundle2 case and
729 # should return a stream.
732 # should return a stream.
730 return streamres(r.getchunks())
733 return streamres(r.getchunks())
731 return pushres(r)
734 return pushres(r)
732
735
733 finally:
736 finally:
734 fp.close()
737 fp.close()
735 os.unlink(tempname)
738 os.unlink(tempname)
736
739
737 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
740 except (error.BundleValueError, util.Abort, error.PushRaced) as exc:
738 # handle non-bundle2 case first
741 # handle non-bundle2 case first
739 if not getattr(exc, 'duringunbundle2', False):
742 if not getattr(exc, 'duringunbundle2', False):
740 try:
743 try:
741 raise
744 raise
742 except util.Abort:
745 except util.Abort:
743 # The old code we moved used sys.stderr directly.
746 # The old code we moved used sys.stderr directly.
744 # We did not change it to minimise code change.
747 # We did not change it to minimise code change.
745 # This need to be moved to something proper.
748 # This need to be moved to something proper.
746 # Feel free to do it.
749 # Feel free to do it.
747 sys.stderr.write("abort: %s\n" % exc)
750 sys.stderr.write("abort: %s\n" % exc)
748 return pushres(0)
751 return pushres(0)
749 except error.PushRaced:
752 except error.PushRaced:
750 return pusherr(str(exc))
753 return pusherr(str(exc))
751
754
752 bundler = bundle2.bundle20(repo.ui)
755 bundler = bundle2.bundle20(repo.ui)
753 for out in getattr(exc, '_bundle2salvagedoutput', ()):
756 for out in getattr(exc, '_bundle2salvagedoutput', ()):
754 bundler.addpart(out)
757 bundler.addpart(out)
755 try:
758 try:
756 try:
759 try:
757 raise
760 raise
758 except error.PushkeyFailed as exc:
761 except error.PushkeyFailed as exc:
759 # check client caps
762 # check client caps
760 remotecaps = getattr(exc, '_replycaps', None)
763 remotecaps = getattr(exc, '_replycaps', None)
761 if (remotecaps is not None
764 if (remotecaps is not None
762 and 'pushkey' not in remotecaps.get('error', ())):
765 and 'pushkey' not in remotecaps.get('error', ())):
763 # no support remote side, fallback to Abort handler.
766 # no support remote side, fallback to Abort handler.
764 raise
767 raise
765 part = bundler.newpart('error:pushkey')
768 part = bundler.newpart('error:pushkey')
766 part.addparam('in-reply-to', exc.partid)
769 part.addparam('in-reply-to', exc.partid)
767 if exc.namespace is not None:
770 if exc.namespace is not None:
768 part.addparam('namespace', exc.namespace, mandatory=False)
771 part.addparam('namespace', exc.namespace, mandatory=False)
769 if exc.key is not None:
772 if exc.key is not None:
770 part.addparam('key', exc.key, mandatory=False)
773 part.addparam('key', exc.key, mandatory=False)
771 if exc.new is not None:
774 if exc.new is not None:
772 part.addparam('new', exc.new, mandatory=False)
775 part.addparam('new', exc.new, mandatory=False)
773 if exc.old is not None:
776 if exc.old is not None:
774 part.addparam('old', exc.old, mandatory=False)
777 part.addparam('old', exc.old, mandatory=False)
775 if exc.ret is not None:
778 if exc.ret is not None:
776 part.addparam('ret', exc.ret, mandatory=False)
779 part.addparam('ret', exc.ret, mandatory=False)
777 except error.BundleValueError as exc:
780 except error.BundleValueError as exc:
778 errpart = bundler.newpart('error:unsupportedcontent')
781 errpart = bundler.newpart('error:unsupportedcontent')
779 if exc.parttype is not None:
782 if exc.parttype is not None:
780 errpart.addparam('parttype', exc.parttype)
783 errpart.addparam('parttype', exc.parttype)
781 if exc.params:
784 if exc.params:
782 errpart.addparam('params', '\0'.join(exc.params))
785 errpart.addparam('params', '\0'.join(exc.params))
783 except util.Abort as exc:
786 except util.Abort as exc:
784 manargs = [('message', str(exc))]
787 manargs = [('message', str(exc))]
785 advargs = []
788 advargs = []
786 if exc.hint is not None:
789 if exc.hint is not None:
787 advargs.append(('hint', exc.hint))
790 advargs.append(('hint', exc.hint))
788 bundler.addpart(bundle2.bundlepart('error:abort',
791 bundler.addpart(bundle2.bundlepart('error:abort',
789 manargs, advargs))
792 manargs, advargs))
790 except error.PushRaced as exc:
793 except error.PushRaced as exc:
791 bundler.newpart('error:pushraced', [('message', str(exc))])
794 bundler.newpart('error:pushraced', [('message', str(exc))])
792 return streamres(bundler.getchunks())
795 return streamres(bundler.getchunks())
General Comments 0
You need to be logged in to leave comments. Login now